Move all workflow sync stuff into new file

Make changes to api naming scheme
add some error checking around endpoints for missing/invalid endpoints
This commit is contained in:
Kelly McDonald 2020-12-10 10:46:23 -05:00
parent a8203ed01d
commit 3f56dfe484
3 changed files with 362 additions and 331 deletions

View File

@ -100,9 +100,9 @@ paths:
type: array type: array
items: items:
$ref: "#/components/schemas/Study" $ref: "#/components/schemas/Study"
/workflow_spec/pullall: /workflow_sync/pullall:
get: get:
operationId: crc.api.workflow.sync_all_changed_workflows operationId: crc.api.workflow_sync.sync_all_changed_workflows
summary: Sync all workflows that have changed on the remote side and provide a list of the results summary: Sync all workflows that have changed on the remote side and provide a list of the results
security: security:
- ApiKeyAuth : [] - ApiKeyAuth : []
@ -115,7 +115,7 @@ paths:
schema: schema:
type: string type: string
tags: tags:
- Workflow Spec States - Workflow Sync API
responses: responses:
'200': '200':
description: An array of workflow specs that were synced from remote. description: An array of workflow specs that were synced from remote.
@ -130,9 +130,9 @@ paths:
/workflow_spec/diff: /workflow_sync/diff:
get: get:
operationId: crc.api.workflow.get_changed_workflows operationId: crc.api.workflow_sync.get_changed_workflows
summary: Provides a list of workflow that differ from remote and if it is new or not summary: Provides a list of workflow that differ from remote and if it is new or not
security : security :
- ApiKeyAuth : [] - ApiKeyAuth : []
@ -145,7 +145,7 @@ paths:
schema: schema:
type: string type: string
tags: tags:
- Workflow Spec States - Workflow Sync API
responses: responses:
'200': '200':
description: An array of workflow specs, with last touched date and which one is most recent. description: An array of workflow specs, with last touched date and which one is most recent.
@ -157,9 +157,9 @@ paths:
$ref: "#/components/schemas/WorkflowSpecDiffList" $ref: "#/components/schemas/WorkflowSpecDiffList"
/workflow_spec/{workflow_spec_id}/files: /workflow_sync/{workflow_spec_id}/files:
get: get:
operationId: crc.api.workflow.get_workflow_spec_files operationId: crc.api.workflow_sync.get_workflow_spec_files
summary: Provides a list of files for a workflow spec on this machine. summary: Provides a list of files for a workflow spec on this machine.
security : security :
- ApiKeyAuth : [] - ApiKeyAuth : []
@ -172,7 +172,7 @@ paths:
type: string type: string
tags: tags:
- Workflow Spec States - Workflow Sync API
responses: responses:
'200': '200':
description: An array of files for a workflow spec on the local system, with details. description: An array of files for a workflow spec on the local system, with details.
@ -183,9 +183,9 @@ paths:
items: items:
$ref: "#/components/schemas/WorkflowSpecFilesList" $ref: "#/components/schemas/WorkflowSpecFilesList"
/workflow_spec/{workflow_spec_id}/files/sync: /workflow_sync/{workflow_spec_id}/files/sync:
get: get:
operationId: crc.api.workflow.sync_changed_files operationId: crc.api.workflow_sync.sync_changed_files
summary: Syncs files from a workflow on a remote system and provides a list of files that were updated summary: Syncs files from a workflow on a remote system and provides a list of files that were updated
security : security :
- ApiKeyAuth : [] - ApiKeyAuth : []
@ -204,7 +204,7 @@ paths:
type: string type: string
tags: tags:
- Workflow Spec States - Workflow Sync API
responses: responses:
'200': '200':
description: A list of files that were synced for the workflow. description: A list of files that were synced for the workflow.
@ -217,9 +217,9 @@ paths:
example : ["data_security_plan.dmn",'some_other_file.xml'] example : ["data_security_plan.dmn",'some_other_file.xml']
/workflow_spec/{workflow_spec_id}/files/diff: /workflow_sync/{workflow_spec_id}/files/diff:
get: get:
operationId: crc.api.workflow.get_changed_files operationId: crc.api.workflow_sync.get_changed_files
summary: Provides a list of files for a workflow specs that differ from remote and their signature. summary: Provides a list of files for a workflow specs that differ from remote and their signature.
security : security :
- ApiKeyAuth : [] - ApiKeyAuth : []
@ -239,7 +239,7 @@ paths:
type: string type: string
tags: tags:
- Workflow Spec States - Workflow Sync API
responses: responses:
'200': '200':
description: An array of files that are different from remote, with last touched date and file signature. description: An array of files that are different from remote, with last touched date and file signature.
@ -251,15 +251,15 @@ paths:
$ref: "#/components/schemas/WorkflowSpecFilesDiff" $ref: "#/components/schemas/WorkflowSpecFilesDiff"
/workflow_spec/all: /workflow_sync/all:
get: get:
operationId: crc.api.workflow.get_all_spec_state operationId: crc.api.workflow_sync.get_all_spec_state
summary: Provides a list of workflow specs, last update date and thumbprint summary: Provides a list of workflow specs, last update date and thumbprint
security: security:
- ApiKeyAuth : [] - ApiKeyAuth : []
tags: tags:
- Workflow Spec States - Workflow Sync API
responses: responses:
'200': '200':
description: An array of workflow specs, with last touched date and file signature. description: An array of workflow specs, with last touched date and file signature.
@ -547,7 +547,7 @@ paths:
description: The workflow spec category has been removed. description: The workflow spec category has been removed.
/file: /file:
parameters: parameters:
- name: workflow_spec_id - name: workflow_sync_id
in: query in: query
required: false required: false
description: The unique id of a workflow specification description: The unique id of a workflow specification
@ -1353,7 +1353,7 @@ components:
type : apiKey type : apiKey
in : header in : header
name : X-CR-API-KEY name : X-CR-API-KEY
x-apikeyInfoFunc: crc.api.workflow.verify_token x-apikeyInfoFunc: crc.api.workflow_sync.verify_token
schemas: schemas:
User: User:
@ -1380,7 +1380,7 @@ components:
type: string type: string
WorkflowSpecDiffList: WorkflowSpecDiffList:
properties: properties:
workflow_spec_id: workflow_sync_id:
type: string type: string
example : top_level_workflow example : top_level_workflow
date_created : date_created :
@ -1397,7 +1397,7 @@ components:
file_model_id: file_model_id:
type : integer type : integer
example : 171 example : 171
workflow_spec_id : workflow_sync_id :
type: string type: string
example : top_level_workflow example : top_level_workflow
filename : filename :
@ -1455,7 +1455,7 @@ components:
WorkflowSpecAll: WorkflowSpecAll:
properties: properties:
workflow_spec_id : workflow_sync_id :
type: string type: string
example : acaf1258-43b4-437e-8846-f612afa66811 example : acaf1258-43b4-437e-8846-f612afa66811
date_created : date_created :
@ -1587,7 +1587,7 @@ components:
content_type: content_type:
type: string type: string
example: "application/xml" example: "application/xml"
workflow_spec_id: workflow_sync_id:
type: string type: string
example: "random_fact" example: "random_fact"
x-nullable: true x-nullable: true
@ -1609,7 +1609,7 @@ components:
$ref: "#/components/schemas/NavigationItem" $ref: "#/components/schemas/NavigationItem"
next_task: next_task:
$ref: "#/components/schemas/Task" $ref: "#/components/schemas/Task"
workflow_spec_id: workflow_sync_id:
type: string type: string
spec_version: spec_version:
type: string type: string
@ -1625,7 +1625,7 @@ components:
example: example:
id: 291234 id: 291234
status: 'user_input_required' status: 'user_input_required'
workflow_spec_id: 'random_fact' workflow_sync_id: 'random_fact'
spec_version: 'v1.1 [22,23]' spec_version: 'v1.1 [22,23]'
is_latest_spec: True is_latest_spec: True
next_task: next_task:

View File

@ -1,11 +1,4 @@
import hashlib
import json
import uuid import uuid
from io import StringIO
from hashlib import md5
import pandas as pd
from SpiffWorkflow.util.deep_merge import DeepMerge
from flask import g from flask import g
from crc import session, db, app from crc import session, db, app
from crc.api.common import ApiError, ApiErrorSchema from crc.api.common import ApiError, ApiErrorSchema
@ -21,8 +14,6 @@ from crc.services.study_service import StudyService
from crc.services.user_service import UserService from crc.services.user_service import UserService
from crc.services.workflow_processor import WorkflowProcessor from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService from crc.services.workflow_service import WorkflowService
from flask_cors import cross_origin
import requests
def all_specifications(): def all_specifications():
schema = WorkflowSpecModelSchema(many=True) schema = WorkflowSpecModelSchema(many=True)
@ -263,299 +254,3 @@ def _verify_user_and_role(processor, spiff_task):
raise ApiError.from_task("permission_denied", raise ApiError.from_task("permission_denied",
f"This task must be completed by '{allowed_users}', " f"This task must be completed by '{allowed_users}', "
f"but you are {user.uid}", spiff_task) f"but you are {user.uid}", spiff_task)
def join_uuids(uuids):
"""Joins a pandas Series of uuids and combines them in one hash"""
combined_uuids = ''.join([str(uuid) for uuid in uuids.sort_values()]) # ensure that values are always
# in the same order
return hashlib.md5(combined_uuids.encode('utf8')).hexdigest() # make a hash of the hashes
def verify_token(token, required_scopes):
if token == app.config['API_TOKEN']:
return {'scope':['any']}
else:
raise ApiError("permission_denied","API Token information is not correct")
def get_changed_workflows(remote,as_df=False):
"""
gets a remote endpoint - gets the workflows and then
determines what workflows are different from the remote endpoint
"""
response = requests.get('http://'+remote+'/v1.0/workflow_spec/all',headers={'X-CR-API-KEY':app.config['API_TOKEN']})
# This is probably very and may allow cross site attacks - fix later
remote = pd.DataFrame(json.loads(response.text))
# get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index
local = get_all_spec_state_dataframe().reset_index()
# merge these on workflow spec id and hash - this will
# make two different date columns date_x and date_y
different = remote.merge(local,
right_on=['workflow_spec_id','md5_hash'],
left_on=['workflow_spec_id','md5_hash'],
how = 'outer' ,
indicator=True).loc[lambda x : x['_merge']!='both']
# each line has a tag on it - if was in the left or the right,
# label it so we know if that was on the remote or local machine
different.loc[different['_merge']=='left_only','location'] = 'remote'
different.loc[different['_merge']=='right_only','location'] = 'local'
# this takes the different date_created_x and date-created_y columns and
# combines them back into one date_created column
index = different['date_created_x'].isnull()
different.loc[index,'date_created_x'] = different[index]['date_created_y']
different = different[['workflow_spec_id','date_created_x','location']].copy()
different.columns=['workflow_spec_id','date_created','location']
# our different list will have multiple entries for a workflow if there is a version on either side
# we want to grab the most recent one, so we sort and grab the most recent one for each workflow
changedfiles = different.sort_values('date_created',ascending=False).groupby('workflow_spec_id').first()
# get an exclusive or list of workflow ids - that is we want lists of files that are
# on one machine or the other, but not both
remote_spec_ids = remote[['workflow_spec_id']]
local_spec_ids = local[['workflow_spec_id']]
left = remote_spec_ids[~remote_spec_ids['workflow_spec_id'].isin(local_spec_ids['workflow_spec_id'])]
right = local_spec_ids[~local_spec_ids['workflow_spec_id'].isin(remote_spec_ids['workflow_spec_id'])]
# flag files as new that are only on the remote box and remove the files that are only on the local box
changedfiles['new'] = False
changedfiles.loc[changedfiles.index.isin(left['workflow_spec_id']), 'new'] = True
output = changedfiles[~changedfiles.index.isin(right['workflow_spec_id'])]
# return the list as a dict, let swagger convert it to json
if as_df:
return output
else:
return output.reset_index().to_dict(orient='records')
def sync_all_changed_workflows(remote):
workflowsdf = get_changed_workflows(remote,as_df=True)
workflows = workflowsdf.reset_index().to_dict(orient='records')
for workflow in workflows:
sync_changed_files(remote,workflow['workflow_spec_id'])
return [x['workflow_spec_id'] for x in workflows]
def sync_changed_files(remote,workflow_spec_id):
# make sure that spec is local before syncing files
remotespectext = requests.get('http://'+remote+'/v1.0/workflow-specification/'+workflow_spec_id,
headers={'X-CR-API-KEY': app.config['API_TOKEN']})
specdict = json.loads(remotespectext.text)
localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first()
if localspec is None:
localspec = WorkflowSpecModel()
localspec.id = workflow_spec_id
if specdict['category'] == None:
localspec.category = None
else:
localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name
== specdict['category']['name']).first()
if localcategory == None:
#category doesn't exist - lets make it
localcategory = WorkflowSpecCategoryModel()
localcategory.name = specdict['category']['name']
localcategory.display_name = specdict['category']['display_name']
localcategory.display_order = specdict['category']['display_order']
session.add(localcategory)
localspec.category = localcategory
localspec.display_order = specdict['display_order']
localspec.display_name = specdict['display_name']
localspec.name = specdict['name']
localspec.description = specdict['description']
session.add(localspec)
changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True)
if len(changedfiles)==0:
return []
updatefiles = changedfiles[~((changedfiles['new']==True) & (changedfiles['location']=='local'))]
updatefiles = updatefiles.reset_index().to_dict(orient='records')
deletefiles = changedfiles[((changedfiles['new']==True) & (changedfiles['location']=='local'))]
deletefiles = deletefiles.reset_index().to_dict(orient='records')
for delfile in deletefiles:
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
FileModel.name == delfile['filename']).first()
# it is more appropriate to archive the file than delete
# due to the fact that we might have workflows that are using the
# file data
currentfile.archived = True
session.add(currentfile)
for updatefile in updatefiles:
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
FileModel.name == updatefile['filename']).first()
if not currentfile:
currentfile = FileModel()
currentfile.name = updatefile['filename']
currentfile.workflow_spec_id = workflow_spec_id
currentfile.date_created = updatefile['date_created']
currentfile.type = updatefile['type']
currentfile.primary = updatefile['primary']
currentfile.content_type = updatefile['content_type']
currentfile.primary_process_id = updatefile['primary_process_id']
session.add(currentfile)
response = requests.get('http://'+remote+'/v1.0/file/'+updatefile['md5_hash']+'/hash_data',
headers={'X-CR-API-KEY': app.config['API_TOKEN']})
FileService.update_file(currentfile,response.content,updatefile['type'])
session.commit()
return [x['filename'] for x in updatefiles]
def get_changed_files(remote,workflow_spec_id,as_df=False):
"""
gets a remote endpoint - gets the files for a workflow_spec on both
local and remote and determines what files have been change and returns a list of those
files
"""
response = requests.get('http://'+remote+'/v1.0/workflow_spec/'+workflow_spec_id+'/files',
headers={'X-CR-API-KEY':app.config['API_TOKEN']})
# This is probably very and may allow cross site attacks - fix later
remote = pd.DataFrame(json.loads(response.text))
# get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index
local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index()
local['md5_hash'] = local['md5_hash'].astype('str')
different = remote.merge(local,
right_on=['filename','md5_hash'],
left_on=['filename','md5_hash'],
how = 'outer' ,
indicator=True).loc[lambda x : x['_merge']!='both']
if len(different) == 0:
if as_df:
return different
else:
return []
# each line has a tag on it - if was in the left or the right,
# label it so we know if that was on the remote or local machine
different.loc[different['_merge']=='left_only','location'] = 'remote'
different.loc[different['_merge']=='right_only','location'] = 'local'
# this takes the different date_created_x and date-created_y columns and
# combines them back into one date_created column
dualfields = ['date_created','type','primary','content_type','primary_process_id']
for merge in dualfields:
index = different[merge+'_x'].isnull()
different.loc[index,merge+'_x'] = different[index][merge+'_y']
fieldlist = [fld+'_x' for fld in dualfields]
different = different[ fieldlist + ['md5_hash','filename','location']].copy()
different.columns=dualfields+['md5_hash','filename','location']
# our different list will have multiple entries for a workflow if there is a version on either side
# we want to grab the most recent one, so we sort and grab the most recent one for each workflow
changedfiles = different.sort_values('date_created',ascending=False).groupby('filename').first()
# get an exclusive or list of workflow ids - that is we want lists of files that are
# on one machine or the other, but not both
remote_spec_ids = remote[['filename']]
local_spec_ids = local[['filename']]
left = remote_spec_ids[~remote_spec_ids['filename'].isin(local_spec_ids['filename'])]
right = local_spec_ids[~local_spec_ids['filename'].isin(remote_spec_ids['filename'])]
changedfiles['new'] = False
changedfiles.loc[changedfiles.index.isin(left['filename']), 'new'] = True
changedfiles.loc[changedfiles.index.isin(right['filename']),'new'] = True
changedfiles = changedfiles.replace({pd.np.nan: None})
# return the list as a dict, let swagger convert it to json
if as_df:
return changedfiles
else:
return changedfiles.reset_index().to_dict(orient='records')
def get_all_spec_state():
"""
Return a list of all workflow specs along with last updated date and a
thumbprint of all of the files that are used for that workflow_spec
Convert into a dict list from a dataframe
"""
df = get_all_spec_state_dataframe()
return df.reset_index().to_dict(orient='records')
def get_workflow_spec_files(workflow_spec_id):
"""
Return a list of all workflow specs along with last updated date and a
thumbprint of all of the files that are used for that workflow_spec
Convert into a dict list from a dataframe
"""
df = get_workflow_spec_files_dataframe(workflow_spec_id)
return df.reset_index().to_dict(orient='records')
def get_workflow_spec_files_dataframe(workflowid):
"""
Return a list of all files for a workflow_spec along with last updated date and a
hash so we can determine file differences for a changed workflow on a box.
Return a dataframe
"""
x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid)
# there might be a cleaner way of getting a data frome from some of the
# fields in the ORM - but this works OK
filelist = []
for file in x:
filelist.append({'file_model_id':file.file_model_id,
'workflow_spec_id': file.file_model.workflow_spec_id,
'md5_hash':file.md5_hash,
'filename':file.file_model.name,
'type':file.file_model.type.name,
'primary':file.file_model.primary,
'content_type':file.file_model.content_type,
'primary_process_id':file.file_model.primary_process_id,
'date_created':file.date_created})
if len(filelist) == 0:
return pd.DataFrame(columns=['file_model_id',
'workflow_spec_id',
'md5_hash',
'filename',
'type',
'primary',
'content_type',
'primary_process_id',
'date_created'])
df = pd.DataFrame(filelist).sort_values('date_created').groupby('file_model_id').last()
df['date_created'] = df['date_created'].astype('str')
return df
def get_all_spec_state_dataframe():
"""
Return a list of all workflow specs along with last updated date and a
thumbprint of all of the files that are used for that workflow_spec
Return a dataframe
"""
x = session.query(FileDataModel).join(FileModel)
# there might be a cleaner way of getting a data frome from some of the
# fields in the ORM - but this works OK
filelist = []
for file in x:
filelist.append({'file_model_id':file.file_model_id,
'workflow_spec_id': file.file_model.workflow_spec_id,
'md5_hash':file.md5_hash,
'filename':file.file_model.name,
'date_created':file.date_created})
df = pd.DataFrame(filelist)
# get a distinct list of file_model_id's with the most recent file_data retained
df = df.sort_values('date_created').drop_duplicates(['file_model_id'],keep='last').copy()
# take that list and then group by workflow_spec and retain the most recently touched file
# and make a consolidated hash of the md5_checksums - this acts as a 'thumbprint' for each
# workflow spec
df = df.groupby('workflow_spec_id').agg({'date_created':'max',
'md5_hash':join_uuids}).copy()
# get only the columns we are really interested in returning
df = df[['date_created','md5_hash']].copy()
# convert dates to string
df['date_created'] = df['date_created'].astype('str')
return df

336
crc/api/workflow_sync.py Normal file
View File

@ -0,0 +1,336 @@
import hashlib
import json
import pandas as pd
import requests
from crc import session, app
from crc.api.common import ApiError
from crc.models.file import FileModel, FileDataModel
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecCategoryModel
from crc.services.file_service import FileService
def join_uuids(uuids):
"""Joins a pandas Series of uuids and combines them in one hash"""
combined_uuids = ''.join([str(uuid) for uuid in uuids.sort_values()]) # ensure that values are always
# in the same order
return hashlib.md5(combined_uuids.encode('utf8')).hexdigest() # make a hash of the hashes
def verify_token(token, required_scopes):
if token == app.config['API_TOKEN']:
return {'scope':['any']}
else:
raise ApiError("permission_denied", "API Token information is not correct")
def get_changed_workflows(remote,as_df=False):
"""
gets a remote endpoint - gets the workflows and then
determines what workflows are different from the remote endpoint
"""
try:
response = requests.get('http://'+remote+'/v1.0/workflow_sync/all',headers={'X-CR-API-KEY':app.config['API_TOKEN']})
except:
raise ApiError("endpoint error", 'had a problem connecting to '+remote)
if response.status_code != 200:
raise ApiError("endpoint error", response.text)
remote = pd.DataFrame(json.loads(response.text))
# get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index
local = get_all_spec_state_dataframe().reset_index()
# merge these on workflow spec id and hash - this will
# make two different date columns date_x and date_y
different = remote.merge(local,
right_on=['workflow_spec_id','md5_hash'],
left_on=['workflow_spec_id','md5_hash'],
how = 'outer' ,
indicator=True).loc[lambda x : x['_merge']!='both']
if len(different)==0:
return []
# each line has a tag on it - if was in the left or the right,
# label it so we know if that was on the remote or local machine
different.loc[different['_merge']=='left_only','location'] = 'remote'
different.loc[different['_merge']=='right_only','location'] = 'local'
# this takes the different date_created_x and date-created_y columns and
# combines them back into one date_created column
index = different['date_created_x'].isnull()
different.loc[index,'date_created_x'] = different[index]['date_created_y']
different = different[['workflow_spec_id','date_created_x','location']].copy()
different.columns=['workflow_spec_id','date_created','location']
# our different list will have multiple entries for a workflow if there is a version on either side
# we want to grab the most recent one, so we sort and grab the most recent one for each workflow
changedfiles = different.sort_values('date_created',ascending=False).groupby('workflow_spec_id').first()
# get an exclusive or list of workflow ids - that is we want lists of files that are
# on one machine or the other, but not both
remote_spec_ids = remote[['workflow_spec_id']]
local_spec_ids = local[['workflow_spec_id']]
left = remote_spec_ids[~remote_spec_ids['workflow_spec_id'].isin(local_spec_ids['workflow_spec_id'])]
right = local_spec_ids[~local_spec_ids['workflow_spec_id'].isin(remote_spec_ids['workflow_spec_id'])]
# flag files as new that are only on the remote box and remove the files that are only on the local box
changedfiles['new'] = False
changedfiles.loc[changedfiles.index.isin(left['workflow_spec_id']), 'new'] = True
output = changedfiles[~changedfiles.index.isin(right['workflow_spec_id'])]
# return the list as a dict, let swagger convert it to json
if as_df:
return output
else:
return output.reset_index().to_dict(orient='records')
def sync_all_changed_workflows(remote):
workflowsdf = get_changed_workflows(remote,as_df=True)
if len(workflowsdf) ==0:
return []
workflows = workflowsdf.reset_index().to_dict(orient='records')
for workflow in workflows:
sync_changed_files(remote,workflow['workflow_spec_id'])
return [x['workflow_spec_id'] for x in workflows]
def sync_changed_files(remote,workflow_spec_id):
# make sure that spec is local before syncing files
try:
remotespectext = requests.get('http://'+remote+'/v1.0/workflow-specification/'+workflow_spec_id,
headers={'X-CR-API-KEY': app.config['API_TOKEN']})
except:
raise ApiError("endpoint error", 'had a problem connecting to '+remote)
if remotespectext.status_code != 200:
raise ApiError("endpoint error", response.text)
specdict = json.loads(remotespectext.text)
localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first()
if localspec is None:
localspec = WorkflowSpecModel()
localspec.id = workflow_spec_id
if specdict['category'] == None:
localspec.category = None
else:
localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name
== specdict['category']['name']).first()
if localcategory == None:
#category doesn't exist - lets make it
localcategory = WorkflowSpecCategoryModel()
localcategory.name = specdict['category']['name']
localcategory.display_name = specdict['category']['display_name']
localcategory.display_order = specdict['category']['display_order']
session.add(localcategory)
localspec.category = localcategory
localspec.display_order = specdict['display_order']
localspec.display_name = specdict['display_name']
localspec.name = specdict['name']
localspec.description = specdict['description']
session.add(localspec)
changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True)
if len(changedfiles)==0:
return []
updatefiles = changedfiles[~((changedfiles['new']==True) & (changedfiles['location']=='local'))]
updatefiles = updatefiles.reset_index().to_dict(orient='records')
deletefiles = changedfiles[((changedfiles['new']==True) & (changedfiles['location']=='local'))]
deletefiles = deletefiles.reset_index().to_dict(orient='records')
for delfile in deletefiles:
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
FileModel.name == delfile['filename']).first()
# it is more appropriate to archive the file than delete
# due to the fact that we might have workflows that are using the
# file data
currentfile.archived = True
session.add(currentfile)
for updatefile in updatefiles:
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
FileModel.name == updatefile['filename']).first()
if not currentfile:
currentfile = FileModel()
currentfile.name = updatefile['filename']
currentfile.workflow_spec_id = workflow_spec_id
currentfile.date_created = updatefile['date_created']
currentfile.type = updatefile['type']
currentfile.primary = updatefile['primary']
currentfile.content_type = updatefile['content_type']
currentfile.primary_process_id = updatefile['primary_process_id']
session.add(currentfile)
try:
response = requests.get('http://'+remote+'/v1.0/file/'+updatefile['md5_hash']+'/hash_data',
headers={'X-CR-API-KEY': app.config['API_TOKEN']})
except:
raise ApiError("endpoint error", 'had a problem connecting to ' + remote)
if response.status_code != 200:
raise ApiError("endpoint error", response.text)
FileService.update_file(currentfile,response.content,updatefile['type'])
session.commit()
return [x['filename'] for x in updatefiles]
def get_changed_files(remote,workflow_spec_id,as_df=False):
"""
gets a remote endpoint - gets the files for a workflow_spec on both
local and remote and determines what files have been change and returns a list of those
files
"""
try:
response = requests.get('http://'+remote+'/v1.0/workflow_sync/'+workflow_spec_id+'/files',
headers={'X-CR-API-KEY':app.config['API_TOKEN']})
except:
raise ApiError("endpoint error", 'had a problem connecting to '+remote)
if response.status_code != 200:
raise ApiError("endpoint error", response.text)
# This is probably very and may allow cross site attacks - fix later
remote = pd.DataFrame(json.loads(response.text))
# get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index
local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index()
local['md5_hash'] = local['md5_hash'].astype('str')
different = remote.merge(local,
right_on=['filename','md5_hash'],
left_on=['filename','md5_hash'],
how = 'outer' ,
indicator=True).loc[lambda x : x['_merge']!='both']
if len(different) == 0:
if as_df:
return different
else:
return []
# each line has a tag on it - if was in the left or the right,
# label it so we know if that was on the remote or local machine
different.loc[different['_merge']=='left_only','location'] = 'remote'
different.loc[different['_merge']=='right_only','location'] = 'local'
# this takes the different date_created_x and date-created_y columns and
# combines them back into one date_created column
dualfields = ['date_created','type','primary','content_type','primary_process_id']
for merge in dualfields:
index = different[merge+'_x'].isnull()
different.loc[index,merge+'_x'] = different[index][merge+'_y']
fieldlist = [fld+'_x' for fld in dualfields]
different = different[ fieldlist + ['md5_hash','filename','location']].copy()
different.columns=dualfields+['md5_hash','filename','location']
# our different list will have multiple entries for a workflow if there is a version on either side
# we want to grab the most recent one, so we sort and grab the most recent one for each workflow
changedfiles = different.sort_values('date_created',ascending=False).groupby('filename').first()
# get an exclusive or list of workflow ids - that is we want lists of files that are
# on one machine or the other, but not both
remote_spec_ids = remote[['filename']]
local_spec_ids = local[['filename']]
left = remote_spec_ids[~remote_spec_ids['filename'].isin(local_spec_ids['filename'])]
right = local_spec_ids[~local_spec_ids['filename'].isin(remote_spec_ids['filename'])]
changedfiles['new'] = False
changedfiles.loc[changedfiles.index.isin(left['filename']), 'new'] = True
changedfiles.loc[changedfiles.index.isin(right['filename']),'new'] = True
changedfiles = changedfiles.replace({pd.np.nan: None})
# return the list as a dict, let swagger convert it to json
if as_df:
return changedfiles
else:
return changedfiles.reset_index().to_dict(orient='records')
def get_all_spec_state():
"""
Return a list of all workflow specs along with last updated date and a
thumbprint of all of the files that are used for that workflow_spec
Convert into a dict list from a dataframe
"""
df = get_all_spec_state_dataframe()
return df.reset_index().to_dict(orient='records')
def get_workflow_spec_files(workflow_spec_id):
"""
Return a list of all workflow specs along with last updated date and a
thumbprint of all of the files that are used for that workflow_spec
Convert into a dict list from a dataframe
"""
df = get_workflow_spec_files_dataframe(workflow_spec_id)
return df.reset_index().to_dict(orient='records')
def get_workflow_spec_files_dataframe(workflowid):
"""
Return a list of all files for a workflow_spec along with last updated date and a
hash so we can determine file differences for a changed workflow on a box.
Return a dataframe
"""
x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid)
# there might be a cleaner way of getting a data frome from some of the
# fields in the ORM - but this works OK
filelist = []
for file in x:
filelist.append({'file_model_id':file.file_model_id,
'workflow_spec_id': file.file_model.workflow_spec_id,
'md5_hash':file.md5_hash,
'filename':file.file_model.name,
'type':file.file_model.type.name,
'primary':file.file_model.primary,
'content_type':file.file_model.content_type,
'primary_process_id':file.file_model.primary_process_id,
'date_created':file.date_created})
if len(filelist) == 0:
return pd.DataFrame(columns=['file_model_id',
'workflow_spec_id',
'md5_hash',
'filename',
'type',
'primary',
'content_type',
'primary_process_id',
'date_created'])
df = pd.DataFrame(filelist).sort_values('date_created').groupby('file_model_id').last()
df['date_created'] = df['date_created'].astype('str')
return df
def get_all_spec_state_dataframe():
"""
Return a list of all workflow specs along with last updated date and a
thumbprint of all of the files that are used for that workflow_spec
Return a dataframe
"""
x = session.query(FileDataModel).join(FileModel)
# there might be a cleaner way of getting a data frome from some of the
# fields in the ORM - but this works OK
filelist = []
for file in x:
filelist.append({'file_model_id':file.file_model_id,
'workflow_spec_id': file.file_model.workflow_spec_id,
'md5_hash':file.md5_hash,
'filename':file.file_model.name,
'date_created':file.date_created})
df = pd.DataFrame(filelist)
# get a distinct list of file_model_id's with the most recent file_data retained
df = df.sort_values('date_created').drop_duplicates(['file_model_id'],keep='last').copy()
# take that list and then group by workflow_spec and retain the most recently touched file
# and make a consolidated hash of the md5_checksums - this acts as a 'thumbprint' for each
# workflow spec
df = df.groupby('workflow_spec_id').agg({'date_created':'max',
'md5_hash':join_uuids}).copy()
# get only the columns we are really interested in returning
df = df[['date_created','md5_hash']].copy()
# convert dates to string
df['date_created'] = df['date_created'].astype('str')
return df