From 3f56dfe48477677206b01e303084ab355ee4a077 Mon Sep 17 00:00:00 2001 From: Kelly McDonald Date: Thu, 10 Dec 2020 10:46:23 -0500 Subject: [PATCH] Move all workflow sync stuff into new file Make changes to api naming scheme add some error checking around endpoints for missing/invalid endpoints --- crc/api.yml | 52 +++--- crc/api/workflow.py | 305 ----------------------------------- crc/api/workflow_sync.py | 336 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 362 insertions(+), 331 deletions(-) create mode 100644 crc/api/workflow_sync.py diff --git a/crc/api.yml b/crc/api.yml index 78436178..41e76f90 100644 --- a/crc/api.yml +++ b/crc/api.yml @@ -100,9 +100,9 @@ paths: type: array items: $ref: "#/components/schemas/Study" - /workflow_spec/pullall: + /workflow_sync/pullall: get: - operationId: crc.api.workflow.sync_all_changed_workflows + operationId: crc.api.workflow_sync.sync_all_changed_workflows summary: Sync all workflows that have changed on the remote side and provide a list of the results security: - ApiKeyAuth : [] @@ -115,7 +115,7 @@ paths: schema: type: string tags: - - Workflow Spec States + - Workflow Sync API responses: '200': description: An array of workflow specs that were synced from remote. @@ -130,9 +130,9 @@ paths: - /workflow_spec/diff: + /workflow_sync/diff: get: - operationId: crc.api.workflow.get_changed_workflows + operationId: crc.api.workflow_sync.get_changed_workflows summary: Provides a list of workflow that differ from remote and if it is new or not security : - ApiKeyAuth : [] @@ -145,7 +145,7 @@ paths: schema: type: string tags: - - Workflow Spec States + - Workflow Sync API responses: '200': description: An array of workflow specs, with last touched date and which one is most recent. @@ -157,9 +157,9 @@ paths: $ref: "#/components/schemas/WorkflowSpecDiffList" - /workflow_spec/{workflow_spec_id}/files: + /workflow_sync/{workflow_spec_id}/files: get: - operationId: crc.api.workflow.get_workflow_spec_files + operationId: crc.api.workflow_sync.get_workflow_spec_files summary: Provides a list of files for a workflow spec on this machine. security : - ApiKeyAuth : [] @@ -172,7 +172,7 @@ paths: type: string tags: - - Workflow Spec States + - Workflow Sync API responses: '200': description: An array of files for a workflow spec on the local system, with details. @@ -183,9 +183,9 @@ paths: items: $ref: "#/components/schemas/WorkflowSpecFilesList" - /workflow_spec/{workflow_spec_id}/files/sync: + /workflow_sync/{workflow_spec_id}/files/sync: get: - operationId: crc.api.workflow.sync_changed_files + operationId: crc.api.workflow_sync.sync_changed_files summary: Syncs files from a workflow on a remote system and provides a list of files that were updated security : - ApiKeyAuth : [] @@ -204,7 +204,7 @@ paths: type: string tags: - - Workflow Spec States + - Workflow Sync API responses: '200': description: A list of files that were synced for the workflow. @@ -217,9 +217,9 @@ paths: example : ["data_security_plan.dmn",'some_other_file.xml'] - /workflow_spec/{workflow_spec_id}/files/diff: + /workflow_sync/{workflow_spec_id}/files/diff: get: - operationId: crc.api.workflow.get_changed_files + operationId: crc.api.workflow_sync.get_changed_files summary: Provides a list of files for a workflow specs that differ from remote and their signature. security : - ApiKeyAuth : [] @@ -239,7 +239,7 @@ paths: type: string tags: - - Workflow Spec States + - Workflow Sync API responses: '200': description: An array of files that are different from remote, with last touched date and file signature. @@ -251,15 +251,15 @@ paths: $ref: "#/components/schemas/WorkflowSpecFilesDiff" - /workflow_spec/all: + /workflow_sync/all: get: - operationId: crc.api.workflow.get_all_spec_state + operationId: crc.api.workflow_sync.get_all_spec_state summary: Provides a list of workflow specs, last update date and thumbprint security: - ApiKeyAuth : [] tags: - - Workflow Spec States + - Workflow Sync API responses: '200': description: An array of workflow specs, with last touched date and file signature. @@ -547,7 +547,7 @@ paths: description: The workflow spec category has been removed. /file: parameters: - - name: workflow_spec_id + - name: workflow_sync_id in: query required: false description: The unique id of a workflow specification @@ -1353,7 +1353,7 @@ components: type : apiKey in : header name : X-CR-API-KEY - x-apikeyInfoFunc: crc.api.workflow.verify_token + x-apikeyInfoFunc: crc.api.workflow_sync.verify_token schemas: User: @@ -1380,7 +1380,7 @@ components: type: string WorkflowSpecDiffList: properties: - workflow_spec_id: + workflow_sync_id: type: string example : top_level_workflow date_created : @@ -1397,7 +1397,7 @@ components: file_model_id: type : integer example : 171 - workflow_spec_id : + workflow_sync_id : type: string example : top_level_workflow filename : @@ -1455,7 +1455,7 @@ components: WorkflowSpecAll: properties: - workflow_spec_id : + workflow_sync_id : type: string example : acaf1258-43b4-437e-8846-f612afa66811 date_created : @@ -1587,7 +1587,7 @@ components: content_type: type: string example: "application/xml" - workflow_spec_id: + workflow_sync_id: type: string example: "random_fact" x-nullable: true @@ -1609,7 +1609,7 @@ components: $ref: "#/components/schemas/NavigationItem" next_task: $ref: "#/components/schemas/Task" - workflow_spec_id: + workflow_sync_id: type: string spec_version: type: string @@ -1625,7 +1625,7 @@ components: example: id: 291234 status: 'user_input_required' - workflow_spec_id: 'random_fact' + workflow_sync_id: 'random_fact' spec_version: 'v1.1 [22,23]' is_latest_spec: True next_task: diff --git a/crc/api/workflow.py b/crc/api/workflow.py index 8da5c88a..d14daf11 100644 --- a/crc/api/workflow.py +++ b/crc/api/workflow.py @@ -1,11 +1,4 @@ -import hashlib -import json import uuid -from io import StringIO -from hashlib import md5 - -import pandas as pd -from SpiffWorkflow.util.deep_merge import DeepMerge from flask import g from crc import session, db, app from crc.api.common import ApiError, ApiErrorSchema @@ -21,8 +14,6 @@ from crc.services.study_service import StudyService from crc.services.user_service import UserService from crc.services.workflow_processor import WorkflowProcessor from crc.services.workflow_service import WorkflowService -from flask_cors import cross_origin -import requests def all_specifications(): schema = WorkflowSpecModelSchema(many=True) @@ -263,299 +254,3 @@ def _verify_user_and_role(processor, spiff_task): raise ApiError.from_task("permission_denied", f"This task must be completed by '{allowed_users}', " f"but you are {user.uid}", spiff_task) -def join_uuids(uuids): - """Joins a pandas Series of uuids and combines them in one hash""" - combined_uuids = ''.join([str(uuid) for uuid in uuids.sort_values()]) # ensure that values are always - # in the same order - return hashlib.md5(combined_uuids.encode('utf8')).hexdigest() # make a hash of the hashes - -def verify_token(token, required_scopes): - if token == app.config['API_TOKEN']: - return {'scope':['any']} - else: - raise ApiError("permission_denied","API Token information is not correct") - - -def get_changed_workflows(remote,as_df=False): - """ - gets a remote endpoint - gets the workflows and then - determines what workflows are different from the remote endpoint - """ - response = requests.get('http://'+remote+'/v1.0/workflow_spec/all',headers={'X-CR-API-KEY':app.config['API_TOKEN']}) - - # This is probably very and may allow cross site attacks - fix later - remote = pd.DataFrame(json.loads(response.text)) - # get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index - local = get_all_spec_state_dataframe().reset_index() - - # merge these on workflow spec id and hash - this will - # make two different date columns date_x and date_y - different = remote.merge(local, - right_on=['workflow_spec_id','md5_hash'], - left_on=['workflow_spec_id','md5_hash'], - how = 'outer' , - indicator=True).loc[lambda x : x['_merge']!='both'] - - # each line has a tag on it - if was in the left or the right, - # label it so we know if that was on the remote or local machine - different.loc[different['_merge']=='left_only','location'] = 'remote' - different.loc[different['_merge']=='right_only','location'] = 'local' - - # this takes the different date_created_x and date-created_y columns and - # combines them back into one date_created column - index = different['date_created_x'].isnull() - different.loc[index,'date_created_x'] = different[index]['date_created_y'] - different = different[['workflow_spec_id','date_created_x','location']].copy() - different.columns=['workflow_spec_id','date_created','location'] - - # our different list will have multiple entries for a workflow if there is a version on either side - # we want to grab the most recent one, so we sort and grab the most recent one for each workflow - changedfiles = different.sort_values('date_created',ascending=False).groupby('workflow_spec_id').first() - - # get an exclusive or list of workflow ids - that is we want lists of files that are - # on one machine or the other, but not both - remote_spec_ids = remote[['workflow_spec_id']] - local_spec_ids = local[['workflow_spec_id']] - left = remote_spec_ids[~remote_spec_ids['workflow_spec_id'].isin(local_spec_ids['workflow_spec_id'])] - right = local_spec_ids[~local_spec_ids['workflow_spec_id'].isin(remote_spec_ids['workflow_spec_id'])] - - # flag files as new that are only on the remote box and remove the files that are only on the local box - changedfiles['new'] = False - changedfiles.loc[changedfiles.index.isin(left['workflow_spec_id']), 'new'] = True - output = changedfiles[~changedfiles.index.isin(right['workflow_spec_id'])] - - # return the list as a dict, let swagger convert it to json - if as_df: - return output - else: - return output.reset_index().to_dict(orient='records') - - -def sync_all_changed_workflows(remote): - - workflowsdf = get_changed_workflows(remote,as_df=True) - workflows = workflowsdf.reset_index().to_dict(orient='records') - for workflow in workflows: - sync_changed_files(remote,workflow['workflow_spec_id']) - return [x['workflow_spec_id'] for x in workflows] - - -def sync_changed_files(remote,workflow_spec_id): - # make sure that spec is local before syncing files - remotespectext = requests.get('http://'+remote+'/v1.0/workflow-specification/'+workflow_spec_id, - headers={'X-CR-API-KEY': app.config['API_TOKEN']}) - specdict = json.loads(remotespectext.text) - localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first() - if localspec is None: - localspec = WorkflowSpecModel() - localspec.id = workflow_spec_id - if specdict['category'] == None: - localspec.category = None - else: - localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name - == specdict['category']['name']).first() - if localcategory == None: - #category doesn't exist - lets make it - localcategory = WorkflowSpecCategoryModel() - localcategory.name = specdict['category']['name'] - localcategory.display_name = specdict['category']['display_name'] - localcategory.display_order = specdict['category']['display_order'] - session.add(localcategory) - localspec.category = localcategory - - localspec.display_order = specdict['display_order'] - localspec.display_name = specdict['display_name'] - localspec.name = specdict['name'] - localspec.description = specdict['description'] - session.add(localspec) - - changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True) - if len(changedfiles)==0: - return [] - updatefiles = changedfiles[~((changedfiles['new']==True) & (changedfiles['location']=='local'))] - updatefiles = updatefiles.reset_index().to_dict(orient='records') - - deletefiles = changedfiles[((changedfiles['new']==True) & (changedfiles['location']=='local'))] - deletefiles = deletefiles.reset_index().to_dict(orient='records') - - for delfile in deletefiles: - currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, - FileModel.name == delfile['filename']).first() - - # it is more appropriate to archive the file than delete - # due to the fact that we might have workflows that are using the - # file data - currentfile.archived = True - session.add(currentfile) - - for updatefile in updatefiles: - currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, - FileModel.name == updatefile['filename']).first() - if not currentfile: - currentfile = FileModel() - currentfile.name = updatefile['filename'] - currentfile.workflow_spec_id = workflow_spec_id - - currentfile.date_created = updatefile['date_created'] - currentfile.type = updatefile['type'] - currentfile.primary = updatefile['primary'] - currentfile.content_type = updatefile['content_type'] - currentfile.primary_process_id = updatefile['primary_process_id'] - session.add(currentfile) - - response = requests.get('http://'+remote+'/v1.0/file/'+updatefile['md5_hash']+'/hash_data', - headers={'X-CR-API-KEY': app.config['API_TOKEN']}) - FileService.update_file(currentfile,response.content,updatefile['type']) - session.commit() - return [x['filename'] for x in updatefiles] - - -def get_changed_files(remote,workflow_spec_id,as_df=False): - """ - gets a remote endpoint - gets the files for a workflow_spec on both - local and remote and determines what files have been change and returns a list of those - files - """ - response = requests.get('http://'+remote+'/v1.0/workflow_spec/'+workflow_spec_id+'/files', - headers={'X-CR-API-KEY':app.config['API_TOKEN']}) - # This is probably very and may allow cross site attacks - fix later - remote = pd.DataFrame(json.loads(response.text)) - # get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index - local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index() - local['md5_hash'] = local['md5_hash'].astype('str') - different = remote.merge(local, - right_on=['filename','md5_hash'], - left_on=['filename','md5_hash'], - how = 'outer' , - indicator=True).loc[lambda x : x['_merge']!='both'] - if len(different) == 0: - if as_df: - return different - else: - return [] - # each line has a tag on it - if was in the left or the right, - # label it so we know if that was on the remote or local machine - different.loc[different['_merge']=='left_only','location'] = 'remote' - different.loc[different['_merge']=='right_only','location'] = 'local' - - # this takes the different date_created_x and date-created_y columns and - # combines them back into one date_created column - dualfields = ['date_created','type','primary','content_type','primary_process_id'] - for merge in dualfields: - index = different[merge+'_x'].isnull() - different.loc[index,merge+'_x'] = different[index][merge+'_y'] - - fieldlist = [fld+'_x' for fld in dualfields] - different = different[ fieldlist + ['md5_hash','filename','location']].copy() - - different.columns=dualfields+['md5_hash','filename','location'] - # our different list will have multiple entries for a workflow if there is a version on either side - # we want to grab the most recent one, so we sort and grab the most recent one for each workflow - changedfiles = different.sort_values('date_created',ascending=False).groupby('filename').first() - - # get an exclusive or list of workflow ids - that is we want lists of files that are - # on one machine or the other, but not both - remote_spec_ids = remote[['filename']] - local_spec_ids = local[['filename']] - left = remote_spec_ids[~remote_spec_ids['filename'].isin(local_spec_ids['filename'])] - right = local_spec_ids[~local_spec_ids['filename'].isin(remote_spec_ids['filename'])] - changedfiles['new'] = False - changedfiles.loc[changedfiles.index.isin(left['filename']), 'new'] = True - changedfiles.loc[changedfiles.index.isin(right['filename']),'new'] = True - changedfiles = changedfiles.replace({pd.np.nan: None}) - # return the list as a dict, let swagger convert it to json - if as_df: - return changedfiles - else: - return changedfiles.reset_index().to_dict(orient='records') - - - -def get_all_spec_state(): - """ - Return a list of all workflow specs along with last updated date and a - thumbprint of all of the files that are used for that workflow_spec - Convert into a dict list from a dataframe - """ - df = get_all_spec_state_dataframe() - return df.reset_index().to_dict(orient='records') - - -def get_workflow_spec_files(workflow_spec_id): - """ - Return a list of all workflow specs along with last updated date and a - thumbprint of all of the files that are used for that workflow_spec - Convert into a dict list from a dataframe - """ - df = get_workflow_spec_files_dataframe(workflow_spec_id) - return df.reset_index().to_dict(orient='records') - - -def get_workflow_spec_files_dataframe(workflowid): - """ - Return a list of all files for a workflow_spec along with last updated date and a - hash so we can determine file differences for a changed workflow on a box. - Return a dataframe - """ - x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid) - # there might be a cleaner way of getting a data frome from some of the - # fields in the ORM - but this works OK - filelist = [] - for file in x: - filelist.append({'file_model_id':file.file_model_id, - 'workflow_spec_id': file.file_model.workflow_spec_id, - 'md5_hash':file.md5_hash, - 'filename':file.file_model.name, - 'type':file.file_model.type.name, - 'primary':file.file_model.primary, - 'content_type':file.file_model.content_type, - 'primary_process_id':file.file_model.primary_process_id, - 'date_created':file.date_created}) - if len(filelist) == 0: - return pd.DataFrame(columns=['file_model_id', - 'workflow_spec_id', - 'md5_hash', - 'filename', - 'type', - 'primary', - 'content_type', - 'primary_process_id', - 'date_created']) - df = pd.DataFrame(filelist).sort_values('date_created').groupby('file_model_id').last() - df['date_created'] = df['date_created'].astype('str') - return df - - - -def get_all_spec_state_dataframe(): - """ - Return a list of all workflow specs along with last updated date and a - thumbprint of all of the files that are used for that workflow_spec - Return a dataframe - """ - x = session.query(FileDataModel).join(FileModel) - # there might be a cleaner way of getting a data frome from some of the - # fields in the ORM - but this works OK - filelist = [] - for file in x: - filelist.append({'file_model_id':file.file_model_id, - 'workflow_spec_id': file.file_model.workflow_spec_id, - 'md5_hash':file.md5_hash, - 'filename':file.file_model.name, - 'date_created':file.date_created}) - df = pd.DataFrame(filelist) - - # get a distinct list of file_model_id's with the most recent file_data retained - df = df.sort_values('date_created').drop_duplicates(['file_model_id'],keep='last').copy() - - # take that list and then group by workflow_spec and retain the most recently touched file - # and make a consolidated hash of the md5_checksums - this acts as a 'thumbprint' for each - # workflow spec - df = df.groupby('workflow_spec_id').agg({'date_created':'max', - 'md5_hash':join_uuids}).copy() - # get only the columns we are really interested in returning - df = df[['date_created','md5_hash']].copy() - # convert dates to string - df['date_created'] = df['date_created'].astype('str') - return df - diff --git a/crc/api/workflow_sync.py b/crc/api/workflow_sync.py new file mode 100644 index 00000000..98f04236 --- /dev/null +++ b/crc/api/workflow_sync.py @@ -0,0 +1,336 @@ +import hashlib +import json +import pandas as pd +import requests +from crc import session, app +from crc.api.common import ApiError +from crc.models.file import FileModel, FileDataModel +from crc.models.workflow import WorkflowSpecModel, WorkflowSpecCategoryModel +from crc.services.file_service import FileService + + +def join_uuids(uuids): + """Joins a pandas Series of uuids and combines them in one hash""" + combined_uuids = ''.join([str(uuid) for uuid in uuids.sort_values()]) # ensure that values are always + # in the same order + return hashlib.md5(combined_uuids.encode('utf8')).hexdigest() # make a hash of the hashes + +def verify_token(token, required_scopes): + if token == app.config['API_TOKEN']: + return {'scope':['any']} + else: + raise ApiError("permission_denied", "API Token information is not correct") + + +def get_changed_workflows(remote,as_df=False): + """ + gets a remote endpoint - gets the workflows and then + determines what workflows are different from the remote endpoint + """ + try: + response = requests.get('http://'+remote+'/v1.0/workflow_sync/all',headers={'X-CR-API-KEY':app.config['API_TOKEN']}) + except: + raise ApiError("endpoint error", 'had a problem connecting to '+remote) + if response.status_code != 200: + raise ApiError("endpoint error", response.text) + + remote = pd.DataFrame(json.loads(response.text)) + + # get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index + local = get_all_spec_state_dataframe().reset_index() + + # merge these on workflow spec id and hash - this will + # make two different date columns date_x and date_y + different = remote.merge(local, + right_on=['workflow_spec_id','md5_hash'], + left_on=['workflow_spec_id','md5_hash'], + how = 'outer' , + indicator=True).loc[lambda x : x['_merge']!='both'] + if len(different)==0: + return [] + # each line has a tag on it - if was in the left or the right, + # label it so we know if that was on the remote or local machine + different.loc[different['_merge']=='left_only','location'] = 'remote' + different.loc[different['_merge']=='right_only','location'] = 'local' + + # this takes the different date_created_x and date-created_y columns and + # combines them back into one date_created column + index = different['date_created_x'].isnull() + different.loc[index,'date_created_x'] = different[index]['date_created_y'] + different = different[['workflow_spec_id','date_created_x','location']].copy() + different.columns=['workflow_spec_id','date_created','location'] + + # our different list will have multiple entries for a workflow if there is a version on either side + # we want to grab the most recent one, so we sort and grab the most recent one for each workflow + changedfiles = different.sort_values('date_created',ascending=False).groupby('workflow_spec_id').first() + + # get an exclusive or list of workflow ids - that is we want lists of files that are + # on one machine or the other, but not both + remote_spec_ids = remote[['workflow_spec_id']] + local_spec_ids = local[['workflow_spec_id']] + left = remote_spec_ids[~remote_spec_ids['workflow_spec_id'].isin(local_spec_ids['workflow_spec_id'])] + right = local_spec_ids[~local_spec_ids['workflow_spec_id'].isin(remote_spec_ids['workflow_spec_id'])] + + # flag files as new that are only on the remote box and remove the files that are only on the local box + changedfiles['new'] = False + changedfiles.loc[changedfiles.index.isin(left['workflow_spec_id']), 'new'] = True + output = changedfiles[~changedfiles.index.isin(right['workflow_spec_id'])] + + # return the list as a dict, let swagger convert it to json + if as_df: + return output + else: + return output.reset_index().to_dict(orient='records') + + +def sync_all_changed_workflows(remote): + + workflowsdf = get_changed_workflows(remote,as_df=True) + if len(workflowsdf) ==0: + return [] + workflows = workflowsdf.reset_index().to_dict(orient='records') + for workflow in workflows: + sync_changed_files(remote,workflow['workflow_spec_id']) + return [x['workflow_spec_id'] for x in workflows] + + +def sync_changed_files(remote,workflow_spec_id): + # make sure that spec is local before syncing files + try: + remotespectext = requests.get('http://'+remote+'/v1.0/workflow-specification/'+workflow_spec_id, + headers={'X-CR-API-KEY': app.config['API_TOKEN']}) + except: + raise ApiError("endpoint error", 'had a problem connecting to '+remote) + + if remotespectext.status_code != 200: + raise ApiError("endpoint error", response.text) + + specdict = json.loads(remotespectext.text) + + localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first() + if localspec is None: + localspec = WorkflowSpecModel() + localspec.id = workflow_spec_id + if specdict['category'] == None: + localspec.category = None + else: + localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name + == specdict['category']['name']).first() + if localcategory == None: + #category doesn't exist - lets make it + localcategory = WorkflowSpecCategoryModel() + localcategory.name = specdict['category']['name'] + localcategory.display_name = specdict['category']['display_name'] + localcategory.display_order = specdict['category']['display_order'] + session.add(localcategory) + localspec.category = localcategory + + localspec.display_order = specdict['display_order'] + localspec.display_name = specdict['display_name'] + localspec.name = specdict['name'] + localspec.description = specdict['description'] + session.add(localspec) + + changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True) + if len(changedfiles)==0: + return [] + updatefiles = changedfiles[~((changedfiles['new']==True) & (changedfiles['location']=='local'))] + updatefiles = updatefiles.reset_index().to_dict(orient='records') + + deletefiles = changedfiles[((changedfiles['new']==True) & (changedfiles['location']=='local'))] + deletefiles = deletefiles.reset_index().to_dict(orient='records') + + for delfile in deletefiles: + currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, + FileModel.name == delfile['filename']).first() + + # it is more appropriate to archive the file than delete + # due to the fact that we might have workflows that are using the + # file data + currentfile.archived = True + session.add(currentfile) + + for updatefile in updatefiles: + currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, + FileModel.name == updatefile['filename']).first() + if not currentfile: + currentfile = FileModel() + currentfile.name = updatefile['filename'] + currentfile.workflow_spec_id = workflow_spec_id + + currentfile.date_created = updatefile['date_created'] + currentfile.type = updatefile['type'] + currentfile.primary = updatefile['primary'] + currentfile.content_type = updatefile['content_type'] + currentfile.primary_process_id = updatefile['primary_process_id'] + session.add(currentfile) + try: + response = requests.get('http://'+remote+'/v1.0/file/'+updatefile['md5_hash']+'/hash_data', + headers={'X-CR-API-KEY': app.config['API_TOKEN']}) + except: + raise ApiError("endpoint error", 'had a problem connecting to ' + remote) + + if response.status_code != 200: + raise ApiError("endpoint error", response.text) + + FileService.update_file(currentfile,response.content,updatefile['type']) + session.commit() + return [x['filename'] for x in updatefiles] + + +def get_changed_files(remote,workflow_spec_id,as_df=False): + """ + gets a remote endpoint - gets the files for a workflow_spec on both + local and remote and determines what files have been change and returns a list of those + files + """ + try: + response = requests.get('http://'+remote+'/v1.0/workflow_sync/'+workflow_spec_id+'/files', + headers={'X-CR-API-KEY':app.config['API_TOKEN']}) + except: + raise ApiError("endpoint error", 'had a problem connecting to '+remote) + + if response.status_code != 200: + raise ApiError("endpoint error", response.text) + + # This is probably very and may allow cross site attacks - fix later + remote = pd.DataFrame(json.loads(response.text)) + # get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index + local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index() + local['md5_hash'] = local['md5_hash'].astype('str') + different = remote.merge(local, + right_on=['filename','md5_hash'], + left_on=['filename','md5_hash'], + how = 'outer' , + indicator=True).loc[lambda x : x['_merge']!='both'] + if len(different) == 0: + if as_df: + return different + else: + return [] + # each line has a tag on it - if was in the left or the right, + # label it so we know if that was on the remote or local machine + different.loc[different['_merge']=='left_only','location'] = 'remote' + different.loc[different['_merge']=='right_only','location'] = 'local' + + # this takes the different date_created_x and date-created_y columns and + # combines them back into one date_created column + dualfields = ['date_created','type','primary','content_type','primary_process_id'] + for merge in dualfields: + index = different[merge+'_x'].isnull() + different.loc[index,merge+'_x'] = different[index][merge+'_y'] + + fieldlist = [fld+'_x' for fld in dualfields] + different = different[ fieldlist + ['md5_hash','filename','location']].copy() + + different.columns=dualfields+['md5_hash','filename','location'] + # our different list will have multiple entries for a workflow if there is a version on either side + # we want to grab the most recent one, so we sort and grab the most recent one for each workflow + changedfiles = different.sort_values('date_created',ascending=False).groupby('filename').first() + + # get an exclusive or list of workflow ids - that is we want lists of files that are + # on one machine or the other, but not both + remote_spec_ids = remote[['filename']] + local_spec_ids = local[['filename']] + left = remote_spec_ids[~remote_spec_ids['filename'].isin(local_spec_ids['filename'])] + right = local_spec_ids[~local_spec_ids['filename'].isin(remote_spec_ids['filename'])] + changedfiles['new'] = False + changedfiles.loc[changedfiles.index.isin(left['filename']), 'new'] = True + changedfiles.loc[changedfiles.index.isin(right['filename']),'new'] = True + changedfiles = changedfiles.replace({pd.np.nan: None}) + # return the list as a dict, let swagger convert it to json + if as_df: + return changedfiles + else: + return changedfiles.reset_index().to_dict(orient='records') + + + +def get_all_spec_state(): + """ + Return a list of all workflow specs along with last updated date and a + thumbprint of all of the files that are used for that workflow_spec + Convert into a dict list from a dataframe + """ + df = get_all_spec_state_dataframe() + return df.reset_index().to_dict(orient='records') + + +def get_workflow_spec_files(workflow_spec_id): + """ + Return a list of all workflow specs along with last updated date and a + thumbprint of all of the files that are used for that workflow_spec + Convert into a dict list from a dataframe + """ + df = get_workflow_spec_files_dataframe(workflow_spec_id) + return df.reset_index().to_dict(orient='records') + + +def get_workflow_spec_files_dataframe(workflowid): + """ + Return a list of all files for a workflow_spec along with last updated date and a + hash so we can determine file differences for a changed workflow on a box. + Return a dataframe + """ + x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid) + # there might be a cleaner way of getting a data frome from some of the + # fields in the ORM - but this works OK + filelist = [] + for file in x: + filelist.append({'file_model_id':file.file_model_id, + 'workflow_spec_id': file.file_model.workflow_spec_id, + 'md5_hash':file.md5_hash, + 'filename':file.file_model.name, + 'type':file.file_model.type.name, + 'primary':file.file_model.primary, + 'content_type':file.file_model.content_type, + 'primary_process_id':file.file_model.primary_process_id, + 'date_created':file.date_created}) + if len(filelist) == 0: + return pd.DataFrame(columns=['file_model_id', + 'workflow_spec_id', + 'md5_hash', + 'filename', + 'type', + 'primary', + 'content_type', + 'primary_process_id', + 'date_created']) + df = pd.DataFrame(filelist).sort_values('date_created').groupby('file_model_id').last() + df['date_created'] = df['date_created'].astype('str') + return df + + + +def get_all_spec_state_dataframe(): + """ + Return a list of all workflow specs along with last updated date and a + thumbprint of all of the files that are used for that workflow_spec + Return a dataframe + """ + x = session.query(FileDataModel).join(FileModel) + # there might be a cleaner way of getting a data frome from some of the + # fields in the ORM - but this works OK + filelist = [] + for file in x: + filelist.append({'file_model_id':file.file_model_id, + 'workflow_spec_id': file.file_model.workflow_spec_id, + 'md5_hash':file.md5_hash, + 'filename':file.file_model.name, + 'date_created':file.date_created}) + df = pd.DataFrame(filelist) + + # get a distinct list of file_model_id's with the most recent file_data retained + df = df.sort_values('date_created').drop_duplicates(['file_model_id'],keep='last').copy() + + # take that list and then group by workflow_spec and retain the most recently touched file + # and make a consolidated hash of the md5_checksums - this acts as a 'thumbprint' for each + # workflow spec + df = df.groupby('workflow_spec_id').agg({'date_created':'max', + 'md5_hash':join_uuids}).copy() + # get only the columns we are really interested in returning + df = df[['date_created','md5_hash']].copy() + # convert dates to string + df['date_created'] = df['date_created'].astype('str') + return df +