Cleaned up the Workflow Processor's init method

* removed all the performance metric code into a separate function.
* restructured the code so it is either creating a new workflow, or deserializing an old one.
* Added code to upgrade serialized objects from 1.0 to 1.1
* Using the new method of creating a bpmn_workflow object:
```python
                parser = self.get_spec_parser(self.spec_files, spec_info)
                top_level = parser.get_spec(spec_info.primary_process_id)
                subprocesses = parser.get_process_specs()
                self.bpmn_workflow = BpmnWorkflow(top_level, subprocesses, script_engine=self._script_engine)
```
Fixed a few minor bugs that stood out while testing
1. when updating a workflow, we should check for a valid task BEFORE calling cancel_notify, which requires a valid task.
2. get_localtime - quick fix on the date parser - for python 3.9.
3. the start_workflow script would error out in a way that made it unclear which workflow was having the problem.  Fixed the error.
This commit is contained in:
Dan 2022-06-24 11:34:39 -04:00
parent 9378a2ae1f
commit b15750d2e4
6 changed files with 127 additions and 97 deletions

View File

@ -256,11 +256,13 @@ def set_current_task(workflow_id, task_id):
processor = WorkflowProcessor(workflow_model)
task_id = uuid.UUID(task_id)
spiff_task = processor.bpmn_workflow.get_task(task_id)
cancel_notify = (spiff_task.state == TaskState.COMPLETED and
spiff_task.task_spec.__class__.__name__ != 'EndEvent')
if not spiff_task:
# An invalid task_id was requested.
raise ApiError("invalid_task", "The Task you requested no longer exists as a part of this workflow.")
cancel_notify = (spiff_task.state == TaskState.COMPLETED and
spiff_task.task_spec.__class__.__name__ != 'EndEvent')
_verify_user_and_role(processor, spiff_task)
user_uid = UserService.current_user(allow_admin_impersonate=True).uid

View File

@ -29,7 +29,8 @@ class GetLocaltime(Script):
timezone = args[1]
else:
timezone = 'US/Eastern'
parsed_timestamp = dateparser.parse(timestamp)
# with Python 3.9, not passing the timezone resuls in a PytzUsageWarning usage warning.
parsed_timestamp = dateparser.parse(timestamp, settings={'TIMEZONE': 'UTC'})
localtime = parsed_timestamp.astimezone(pytz.timezone(timezone))
return localtime

View File

@ -1,3 +1,5 @@
from SpiffWorkflow.exceptions import WorkflowTaskExecException
from crc import session
from crc.api.common import ApiError
from crc.models.api_models import WorkflowApi, WorkflowApiSchema
@ -25,14 +27,14 @@ class StartWorkflow(Script):
raise ApiError(code='missing_parameter',
message=f'The start_workflow script requires a workflow id')
workflow = session.query(WorkflowModel).\
filter(WorkflowModel.study_id==study_id).\
filter(WorkflowModel.workflow_spec_id==workflow_spec_id).\
workflow = session.query(WorkflowModel). \
filter(WorkflowModel.study_id == study_id). \
filter(WorkflowModel.workflow_spec_id == workflow_spec_id). \
first()
if not(workflow):
if not (workflow):
raise ApiError(code='unknown_workflow',
message=f"We could not find a workflow with workflow_spec_id '{workflow_spec_id}'.")
message=f"We could not find a workflow with workflow_spec_id '{workflow_spec_id}'.")
return workflow
@ -43,8 +45,12 @@ class StartWorkflow(Script):
workflow_model = self.get_workflow(study_id, *args, **kwargs)
if workflow_model.status != WorkflowStatus.not_started:
return # This workflow has al ready started, don't execute these next very expensive lines.
processor = WorkflowProcessor(workflow_model)
processor.do_engine_steps()
processor.save()
WorkflowService.update_task_assignments(processor)
try:
processor = WorkflowProcessor(workflow_model)
processor.do_engine_steps()
processor.save()
WorkflowService.update_task_assignments(processor)
except ApiError as e:
msg = f"Failed to execute start_workflow('{workflow_model.workflow_spec_id}'). " + e.message
te = WorkflowTaskExecException(task, msg)
raise te

View File

@ -18,6 +18,7 @@ from SpiffWorkflow.camunda.parser.CamundaParser import CamundaParser
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser
from SpiffWorkflow.exceptions import WorkflowTaskExecException
from SpiffWorkflow.specs import WorkflowSpec
from SpiffWorkflow.bpmn.serializer.version_migration import MIGRATIONS
from crc import session
from crc.api.common import ApiError
@ -88,9 +89,12 @@ class MyCustomParser(BpmnDmnParser):
OVERRIDE_PARSER_CLASSES = BpmnDmnParser.OVERRIDE_PARSER_CLASSES
OVERRIDE_PARSER_CLASSES.update(CamundaParser.OVERRIDE_PARSER_CLASSES)
class WorkflowProcessor(object):
_script_engine = CustomBpmnScriptEngine()
SERIALIZER_VERSION = "1.0-CRC"
SERIALIZER_VERSION_1_0 = "1.0-CRC"
SERIALIZER_VERSION_1_1 = "1.1-CRC"
SERIALIZER_VERSION = SERIALIZER_VERSION_1_1
wf_spec_converter = BpmnWorkflowSerializer.configure_workflow_spec_converter(
[UserTaskConverter, BusinessRuleTaskConverter])
_serializer = BpmnWorkflowSerializer(wf_spec_converter, version=SERIALIZER_VERSION)
@ -104,65 +108,75 @@ class WorkflowProcessor(object):
self.workflow_model = workflow_model
self.workflow_spec_service = WorkflowSpecService()
spec = None
if not workflow_model.id:
session.add(workflow_model) ## Assure we have a workflow model id.
if workflow_model.bpmn_workflow_json is None:
# Create a new workflow
spec_info = self.workflow_spec_service.get_spec(workflow_model.workflow_spec_id)
if spec_info is None:
raise (ApiError("missing_spec", "The spec this workflow references does not currently exist."))
self.spec_files = SpecFileService.get_files(spec_info, include_libraries=True)
spec = self.get_spec(self.spec_files, spec_info)
else:
B = len(workflow_model.bpmn_workflow_json.encode('utf-8'))
MB = float(1024 ** 2)
json_size = B/MB
if json_size > 1:
wf_json = json.loads(workflow_model.bpmn_workflow_json)
if 'spec' in wf_json and 'tasks' in wf_json: #
task_tree = wf_json['tasks']
test_spec = wf_json['spec']
task_size = "{:.2f}".format(len(json.dumps(task_tree).encode('utf-8'))/MB)
spec_size = "{:.2f}".format(len(json.dumps(test_spec).encode('utf-8'))/MB)
message = 'Workflow ' + workflow_model.workflow_spec_id + ' JSON Size is over 1MB:{0:.2f} MB'.format(json_size)
message += f"\n Task Size: {task_size}"
message += f"\n Spec Size: {spec_size}"
app.logger.warning(message)
try:
parser = self.get_spec_parser(self.spec_files, spec_info)
top_level = parser.get_spec(spec_info.primary_process_id)
subprocesses = parser.get_process_specs()
self.bpmn_workflow = BpmnWorkflow(top_level, subprocesses, script_engine=self._script_engine)
self.bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = workflow_model.study_id
self.bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = validate_only
self.bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_json(self.bpmn_workflow)
self.save()
except ValidationException as ve:
raise ApiError(code="workflow_validation_error",
message="Failed to parse the Workflow Specification. " +
"Error is '%s.'" % str(ve),
file_name=ve.filename,
task_id=ve.id,
tag=ve.tag)
def check_sub_specs(test_spec, indent=0, show_all=False):
for my_spec_name in test_spec['task_specs']:
my_spec = test_spec['task_specs'][my_spec_name]
my_spec_size = len(json.dumps(my_spec).encode('utf-8')) / MB
if my_spec_size > 0.1 or show_all:
app.logger.warning((' ' * indent) + 'Sub-Spec ' + my_spec['name'] + ' :' + "{:.2f}".format(my_spec_size))
if 'spec' in my_spec:
my_show_all = False
if my_spec['name'] == 'Call_Emails_Process_Email':
my_show_all = True
check_sub_specs(my_spec['spec'], indent + 5)
check_sub_specs(test_spec, 5)
else:
self.bpmn_workflow = WorkflowProcessor.deserialize_workflow(workflow_model)
self.bpmn_workflow.script_engine = self._script_engine
self.workflow_spec_id = workflow_model.workflow_spec_id
try:
self.bpmn_workflow = self.__get_bpmn_workflow(workflow_model, spec, validate_only)
self.bpmn_workflow.script_engine = self._script_engine
def __calculate_workflow_stats(self, workflow_model):
"""We've frequently had performance issues related to the size of a serialized workflow
this helped us track those issues down, and is left here in case it proves useful in the future
NO LONGER USED!!!!."""
if self.WORKFLOW_ID_KEY not in self.bpmn_workflow.data:
if not workflow_model.id:
session.add(workflow_model)
# If the model is new, and has no id, save it, write it into the workflow model
# and save it again. In this way, the workflow process is always aware of the
# database model to which it is associated, and scripts running within the model
# can then load data as needed.
self.bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_json(self.bpmn_workflow)
B = len(workflow_model.bpmn_workflow_json.encode('utf-8'))
MB = float(1024 ** 2)
json_size = B / MB
if json_size > 1:
wf_json = json.loads(workflow_model.bpmn_workflow_json)
if 'spec' in wf_json and 'tasks' in wf_json: #
task_tree = wf_json['tasks']
test_spec = wf_json['spec']
task_size = "{:.2f}".format(len(json.dumps(task_tree).encode('utf-8')) / MB)
spec_size = "{:.2f}".format(len(json.dumps(test_spec).encode('utf-8')) / MB)
message = 'Workflow ' + workflow_model.workflow_spec_id + ' JSON Size is over 1MB:{0:.2f} MB'.format(
json_size)
message += f"\n Task Size: {task_size}"
message += f"\n Spec Size: {spec_size}"
app.logger.warning(message)
self.save()
def check_sub_specs(test_spec, indent=0, show_all=False):
for my_spec_name in test_spec['task_specs']:
my_spec = test_spec['task_specs'][my_spec_name]
my_spec_size = len(json.dumps(my_spec).encode('utf-8')) / MB
if my_spec_size > 0.1 or show_all:
app.logger.warning(
(' ' * indent) + 'Sub-Spec ' + my_spec['name'] + ' :' + "{:.2f}".format(my_spec_size))
if 'spec' in my_spec:
my_show_all = False
if my_spec['name'] == 'Call_Emails_Process_Email':
my_show_all = True
check_sub_specs(my_spec['spec'], indent + 5)
except MissingSpecError as ke:
raise ApiError(code="unexpected_workflow_structure",
message="Failed to deserialize workflow"
" '%s' due to a mis-placed or missing task '%s'" %
(self.workflow_spec_id, str(ke)))
check_sub_specs(test_spec, 5)
@staticmethod
def reset(workflow_model, clear_data=False):
@ -174,8 +188,8 @@ class WorkflowProcessor(object):
# Try to execute a cancel notify
try:
bpmn_workflow = WorkflowProcessor.__get_bpmn_workflow(workflow_model)
WorkflowProcessor.__cancel_notify(bpmn_workflow)
processor = WorkflowProcessor(workflow_model)
processor.cancel_notify()
except Exception as e:
session.rollback() # in case the above left the database with a bad transaction
app.logger.error(f"Unable to send a cancel notify for workflow %s during a reset."
@ -206,21 +220,26 @@ class WorkflowProcessor(object):
@staticmethod
def __get_bpmn_workflow(workflow_model: WorkflowModel, spec: WorkflowSpec = None, validate_only=False):
if workflow_model.bpmn_workflow_json:
version = WorkflowProcessor._serializer.get_version(workflow_model.bpmn_workflow_json)
if(version == WorkflowProcessor.SERIALIZER_VERSION):
bpmn_workflow = WorkflowProcessor._serializer.deserialize_json(workflow_model.bpmn_workflow_json)
def deserialize_workflow(workflow_model: WorkflowModel):
workflow_dict = json.loads(workflow_model.bpmn_workflow_json)
version = WorkflowProcessor._serializer.get_version(workflow_dict)
try:
if version == WorkflowProcessor.SERIALIZER_VERSION:
bpmn_workflow = WorkflowProcessor._serializer.workflow_from_dict(workflow_dict)
elif version == WorkflowProcessor.SERIALIZER_VERSION_1_0:
updated_dict = MIGRATIONS['1.0'](workflow_dict)
bpmn_workflow = WorkflowProcessor._serializer.workflow_from_dict(updated_dict)
else:
bpmn_workflow = WorkflowProcessor.\
_old_serializer.deserialize_workflow(workflow_model.bpmn_workflow_json,
workflow_spec=spec)
raise (ApiError(code="invalid_version",
message="Unable to deserialize this workflow # %s, the version to is too far "
"out of date." % workflow_model.id))
bpmn_workflow.script_engine = WorkflowProcessor._script_engine
else:
bpmn_workflow = BpmnWorkflow(spec, script_engine=WorkflowProcessor._script_engine)
bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = workflow_model.study_id
bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = validate_only
return bpmn_workflow
return bpmn_workflow
except MissingSpecError as ke:
raise ApiError(code="unexpected_workflow_structure",
message="Failed to deserialize workflow '%s' "
"due to a mis-placed or missing task '%s'" %
(workflow_model.workflow_spec_id, str(ke)))
def save(self):
"""Saves the current state of this processor to the database """
@ -239,14 +258,23 @@ class WorkflowProcessor(object):
"""Executes a BPMN specification for the given study, without recording any information to the database
Useful for running the master specification, which should not persist. """
spec_files = SpecFileService().get_files(spec_model, include_libraries=True)
spec = WorkflowProcessor.get_spec(spec_files, spec_model)
parser = WorkflowProcessor.get_spec_parser(spec_files, spec_model)
try:
bpmn_workflow = BpmnWorkflow(spec, script_engine=WorkflowProcessor._script_engine)
top_level = parser.get_spec(spec_model.primary_process_id)
subprocesses = parser.get_process_specs()
bpmn_workflow = BpmnWorkflow(top_level, subprocesses, script_engine=WorkflowProcessor._script_engine)
bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = study.id
bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = False
bpmn_workflow.do_engine_steps()
except WorkflowException as we:
raise ApiError.from_task_spec("error_running_master_spec", str(we), we.sender)
except ValidationException as ve:
raise ApiError(code="workflow_validation_error",
message="Failed to parse the Workflow Specification. " +
"Error is '%s.'" % str(ve),
file_name=ve.filename,
task_id=ve.id,
tag=ve.tag)
if not bpmn_workflow.is_completed():
raise ApiError("master_spec_not_automatic",
@ -260,8 +288,8 @@ class WorkflowProcessor(object):
return parser
@staticmethod
def get_spec(files: List[File], workflow_spec_info: WorkflowSpecInfo):
"""Returns a SpiffWorkflow specification for the given workflow spec,
def get_spec_parser(files: List[File], workflow_spec_info: WorkflowSpecInfo):
"""Returns a SpiffWorkflow parser for the given workflow spec,
using the files provided. """
parser = WorkflowProcessor.get_parser()
@ -276,16 +304,7 @@ class WorkflowProcessor(object):
if workflow_spec_info.primary_process_id is None or workflow_spec_info.primary_process_id == "":
raise (ApiError(code="no_primary_bpmn_error",
message="There is no primary BPMN model defined for workflow %s" % workflow_spec_info.id))
try:
spec = parser.get_spec(workflow_spec_info.primary_process_id)
except ValidationException as ve:
raise ApiError(code="workflow_validation_error",
message="Failed to parse the Workflow Specification. " +
"Error is '%s.'" % str(ve),
file_name=ve.filename,
task_id=ve.id,
tag=ve.tag)
return spec
return parser
@staticmethod
def status_of(bpmn_workflow):

View File

@ -114,7 +114,6 @@ class TestLookupService(BaseTest):
results = LookupService.lookup(workflow, task.task_spec.name, "selectedItem", "", value="pigs", limit=10)
self.assertEqual(0, len(results), "We shouldn't find our animals mixed in with our fruits.")
def test_some_full_text_queries(self):
spec = self.load_test_spec('enum_options_from_file')
workflow = self.create_workflow('enum_options_from_file')
@ -154,7 +153,9 @@ class TestLookupService(BaseTest):
results = LookupService.lookup(workflow, "TaskEnumLookup", "AllTheNames", "Inc", limit=10)
self.assertEqual(7, len(results), "short terms get multiple correct results.")
self.assertEqual("Genetics Savings & Clone, Inc.", results[0]['CUSTOMER_NAME'])
result_names = [r['CUSTOMER_NAME'] for r in results]
self.assertIn("Genetics Savings & Clone, Inc.", result_names)
self.assertIn("Intervascular, Inc.", result_names)
results = LookupService.lookup(workflow, "TaskEnumLookup", "AllTheNames", "reaction design", limit=10)
self.assertEqual(3, len(results), "all results come back for two terms.")

View File

@ -1,3 +1,4 @@
import json
import os
from SpiffWorkflow import TaskState
@ -352,10 +353,10 @@ class TestWorkflowProcessor(BaseTest):
processor = WorkflowProcessor(workflow_model)
processor.do_engine_steps() # Get the thing up and running.
# Use the old serializer to serialize the workflow and set it on the model.
old_school_serializer = BpmnSerializer()
old_school_json = old_school_serializer.serialize_workflow(processor.bpmn_workflow, include_spec=True)
workflow_model.bpmn_workflow_json = old_school_json
# Serlialize the workflow normally, but alter the version number, so we can exercise that older code
old_school_json = json.loads(processor.serialize())
old_school_json['serializer_version'] = "1.0-CRC"
workflow_model.bpmn_workflow_json = json.dumps(old_school_json)
db.session.add(workflow_model)
db.session.commit()
@ -365,4 +366,4 @@ class TestWorkflowProcessor(BaseTest):
processor = WorkflowProcessor(workflow_model)
new_json = processor.serialize()
self.assertIsNotNone(processor._serializer.get_version(new_json))
self.assertEqual(processor.SERIALIZER_VERSION, processor._serializer.get_version(new_json))