diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 1a38259df..4e460b403 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -97,6 +97,7 @@ from spiffworkflow_backend.services.service_task_service import ServiceTaskDeleg from spiffworkflow_backend.services.spec_file_service import SpecFileService from spiffworkflow_backend.services.user_service import UserService + # Sorry about all this crap. I wanted to move this thing to another file, but # importing a bunch of types causes circular imports. @@ -178,16 +179,16 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore ) return Script.generate_augmented_list(script_attributes_context) - def evaluate(self, task: SpiffTask, expression: str, external_methods=None) -> Any: + def evaluate(self, task: SpiffTask, expression: str, external_methods: Any = None) -> Any: """Evaluate.""" return self._evaluate(expression, task.data, task, external_methods) def _evaluate( - self, - expression: str, - context: Dict[str, Union[Box, str]], - task: Optional[SpiffTask] = None, - external_methods: Optional[Dict[str, Any]] = None, + self, + expression: str, + context: Dict[str, Union[Box, str]], + task: Optional[SpiffTask] = None, + external_methods: Optional[Dict[str, Any]] = None, ) -> Any: """_evaluate.""" methods = self.__get_augment_methods(task) @@ -211,7 +212,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore ) from exception def execute( - self, task: SpiffTask, script: str, external_methods: Any = None + self, task: SpiffTask, script: str, external_methods: Any = None ) -> None: """Execute.""" try: @@ -225,10 +226,10 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore raise WorkflowTaskExecException(task, f" {script}, {e}", e) from e def call_service( - self, - operation_name: str, - operation_params: Dict[str, Any], - task_data: Dict[str, Any], + self, + operation_name: str, + operation_params: Dict[str, Any], + task_data: Dict[str, Any], ) -> Any: """CallService.""" return ServiceTaskDelegate.call_connector( @@ -275,7 +276,7 @@ class ProcessInstanceProcessor: # * get_spec, which returns a spec and any subprocesses (as IdToBpmnProcessSpecMapping dict) # * __get_bpmn_process_instance, which takes spec and subprocesses and instantiates and returns a BpmnWorkflow def __init__( - self, process_instance_model: ProcessInstanceModel, validate_only: bool = False + self, process_instance_model: ProcessInstanceModel, validate_only: bool = False ) -> None: """Create a Workflow Processor based on the serialized information available in the process_instance model.""" tld = current_app.config["THREAD_LOCAL_DATA"] @@ -302,7 +303,7 @@ class ProcessInstanceProcessor: ) else: bpmn_json_length = len(process_instance_model.bpmn_json.encode("utf-8")) - megabyte = float(1024**2) + megabyte = float(1024 ** 2) json_size = bpmn_json_length / megabyte if json_size > 1: wf_json = json.loads(process_instance_model.bpmn_json) @@ -316,22 +317,22 @@ class ProcessInstanceProcessor: len(json.dumps(test_spec).encode("utf-8")) / megabyte ) message = ( - "Workflow " - + process_instance_model.process_model_identifier - + f" JSON Size is over 1MB:{json_size:.2f} MB" + "Workflow " + + process_instance_model.process_model_identifier + + f" JSON Size is over 1MB:{json_size:.2f} MB" ) message += f"\n Task Size: {task_size}" message += f"\n Spec Size: {spec_size}" current_app.logger.warning(message) def check_sub_specs( - test_spec: dict, indent: int = 0, show_all: bool = False + test_spec: dict, indent: int = 0, show_all: bool = False ) -> None: """Check_sub_specs.""" for my_spec_name in test_spec["task_specs"]: my_spec = test_spec["task_specs"][my_spec_name] my_spec_size = ( - len(json.dumps(my_spec).encode("utf-8")) / megabyte + len(json.dumps(my_spec).encode("utf-8")) / megabyte ) if my_spec_size > 0.1 or show_all: current_app.logger.warning( @@ -367,13 +368,13 @@ class ProcessInstanceProcessor: raise ApiError( error_code="unexpected_process_instance_structure", message="Failed to deserialize process_instance" - " '%s' due to a mis-placed or missing task '%s'" - % (self.process_model_identifier, str(ke)), + " '%s' due to a mis-placed or missing task '%s'" + % (self.process_model_identifier, str(ke)), ) from ke @classmethod def get_process_model_and_subprocesses( - cls, process_model_identifier: str + cls, process_model_identifier: str ) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]: """Get_process_model_and_subprocesses.""" process_model_info = ProcessModelService.get_process_model( @@ -391,7 +392,7 @@ class ProcessInstanceProcessor: @classmethod def get_bpmn_process_instance_from_process_model( - cls, process_model_identifier: str + cls, process_model_identifier: str ) -> BpmnWorkflow: """Get_all_bpmn_process_identifiers_for_process_model.""" (bpmn_process_spec, subprocesses) = cls.get_process_model_and_subprocesses( @@ -416,7 +417,7 @@ class ProcessInstanceProcessor: return current_user def add_user_info_to_process_instance( - self, bpmn_process_instance: BpmnWorkflow + self, bpmn_process_instance: BpmnWorkflow ) -> None: """Add_user_info_to_process_instance.""" current_user = self.current_user() @@ -429,8 +430,8 @@ class ProcessInstanceProcessor: @staticmethod def get_bpmn_process_instance_from_workflow_spec( - spec: BpmnProcessSpec, - subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, + spec: BpmnProcessSpec, + subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, ) -> BpmnWorkflow: """Get_bpmn_process_instance_from_workflow_spec.""" return BpmnWorkflow( @@ -441,10 +442,10 @@ class ProcessInstanceProcessor: @staticmethod def __get_bpmn_process_instance( - process_instance_model: ProcessInstanceModel, - spec: Optional[BpmnProcessSpec] = None, - validate_only: bool = False, - subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, + process_instance_model: ProcessInstanceModel, + spec: Optional[BpmnProcessSpec] = None, + validate_only: bool = False, + subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, ) -> BpmnWorkflow: """__get_bpmn_process_instance.""" if process_instance_model.bpmn_json: @@ -487,14 +488,14 @@ class ProcessInstanceProcessor: self.save() def raise_if_no_potential_owners( - self, potential_owner_ids: list[int], message: str + self, potential_owner_ids: list[int], message: str ) -> None: """Raise_if_no_potential_owners.""" if not potential_owner_ids: raise (NoPotentialOwnersForTaskError(message)) def get_potential_owner_ids_from_task( - self, task: SpiffTask + self, task: SpiffTask ) -> PotentialOwnerIdList: """Get_potential_owner_ids_from_task.""" task_spec = task.task_spec @@ -666,7 +667,7 @@ class ProcessInstanceProcessor: @staticmethod def backfill_missing_spec_reference_records( - bpmn_process_identifier: str, + bpmn_process_identifier: str, ) -> Optional[str]: """Backfill_missing_spec_reference_records.""" process_models = ProcessModelService.get_process_models(recursive=True) @@ -687,7 +688,7 @@ class ProcessInstanceProcessor: @staticmethod def bpmn_file_full_path_from_bpmn_process_identifier( - bpmn_process_identifier: str, + bpmn_process_identifier: str, ) -> str: """Bpmn_file_full_path_from_bpmn_process_identifier.""" if bpmn_process_identifier is None: @@ -697,8 +698,8 @@ class ProcessInstanceProcessor: spec_reference = ( SpecReferenceCache.query.filter_by(identifier=bpmn_process_identifier) - .filter_by(type="process") - .first() + .filter_by(type="process") + .first() ) bpmn_file_full_path = None if spec_reference is None: @@ -717,15 +718,15 @@ class ProcessInstanceProcessor: ApiError( error_code="could_not_find_bpmn_process_identifier", message="Could not find the the given bpmn process identifier from any sources: %s" - % bpmn_process_identifier, + % bpmn_process_identifier, ) ) return os.path.abspath(bpmn_file_full_path) @staticmethod def update_spiff_parser_with_all_process_dependency_files( - parser: BpmnDmnParser, - processed_identifiers: Optional[set[str]] = None, + parser: BpmnDmnParser, + processed_identifiers: Optional[set[str]] = None, ) -> None: """Update_spiff_parser_with_all_process_dependency_files.""" if processed_identifiers is None: @@ -763,7 +764,7 @@ class ProcessInstanceProcessor: @staticmethod def get_spec( - files: List[File], process_model_info: ProcessModelInfo + files: List[File], process_model_info: ProcessModelInfo ) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]: """Returns a SpiffWorkflow specification for the given process_instance spec, using the files provided.""" parser = ProcessInstanceProcessor.get_parser() @@ -777,14 +778,14 @@ class ProcessInstanceProcessor: dmn: etree.Element = etree.fromstring(data) parser.add_dmn_xml(dmn, filename=file.name) if ( - process_model_info.primary_process_id is None - or process_model_info.primary_process_id == "" + process_model_info.primary_process_id is None + or process_model_info.primary_process_id == "" ): raise ( ApiError( error_code="no_primary_bpmn_error", message="There is no primary BPMN process id defined for process_model %s" - % process_model_info.id, + % process_model_info.id, ) ) ProcessInstanceProcessor.update_spiff_parser_with_all_process_dependency_files( @@ -802,7 +803,7 @@ class ProcessInstanceProcessor: raise ApiError( error_code="process_instance_validation_error", message="Failed to parse the Workflow Specification. " - + "Error is '%s.'" % str(ve), + + "Error is '%s.'" % str(ve), file_name=ve.filename, task_id=ve.id, tag=ve.tag, @@ -849,12 +850,12 @@ class ProcessInstanceProcessor: message_correlations = [] for ( - message_correlation_key, - message_correlation_properties, + message_correlation_key, + message_correlation_properties, ) in bpmn_message.correlations.items(): for ( - message_correlation_property_identifier, - message_correlation_property_value, + message_correlation_property_identifier, + message_correlation_property_value, ) in message_correlation_properties.items(): message_correlation_property = ( MessageCorrelationPropertyModel.query.filter_by( @@ -946,7 +947,7 @@ class ProcessInstanceProcessor: db.session.add(message_instance) for ( - spiff_correlation_property + spiff_correlation_property ) in waiting_task.task_spec.event_definition.correlation_properties: # NOTE: we may have to cycle through keys here # not sure yet if it's valid for a property to be associated with multiple keys @@ -956,9 +957,9 @@ class ProcessInstanceProcessor: process_instance_id=self.process_instance_model.id, name=correlation_key_name, ) - .join(MessageCorrelationPropertyModel) - .filter_by(identifier=spiff_correlation_property.name) - .first() + .join(MessageCorrelationPropertyModel) + .filter_by(identifier=spiff_correlation_property.name) + .first() ) message_correlation_message_instance = ( MessageCorrelationMessageInstanceModel( @@ -1062,12 +1063,12 @@ class ProcessInstanceProcessor: endtasks = [] if self.bpmn_process_instance.is_completed(): for task in SpiffTask.Iterator( - self.bpmn_process_instance.task_tree, TaskState.ANY_MASK + self.bpmn_process_instance.task_tree, TaskState.ANY_MASK ): # Assure that we find the end event for this process_instance, and not for any sub-process_instances. if ( - isinstance(task.task_spec, EndEvent) - and task.workflow == self.bpmn_process_instance + isinstance(task.task_spec, EndEvent) + and task.workflow == self.bpmn_process_instance ): endtasks.append(task) if len(endtasks) > 0: @@ -1103,8 +1104,8 @@ class ProcessInstanceProcessor: return task for task in ready_tasks: if ( - self.bpmn_process_instance.last_task - and task.parent == last_user_task.parent + self.bpmn_process_instance.last_task + and task.parent == last_user_task.parent ): return task @@ -1114,7 +1115,7 @@ class ProcessInstanceProcessor: # and return that next_task = None for task in SpiffTask.Iterator( - self.bpmn_process_instance.task_tree, TaskState.NOT_FINISHED_MASK + self.bpmn_process_instance.task_tree, TaskState.NOT_FINISHED_MASK ): next_task = task return next_task @@ -1198,7 +1199,7 @@ class ProcessInstanceProcessor: t for t in all_tasks if not self.bpmn_process_instance._is_engine_task(t.task_spec) - and t.state in [TaskState.COMPLETED, TaskState.CANCELLED] + and t.state in [TaskState.COMPLETED, TaskState.CANCELLED] ] def get_all_waiting_tasks(self) -> list[SpiffTask]: @@ -1213,7 +1214,7 @@ class ProcessInstanceProcessor: @classmethod def get_task_by_bpmn_identifier( - cls, bpmn_task_identifier: str, bpmn_process_instance: BpmnWorkflow + cls, bpmn_task_identifier: str, bpmn_process_instance: BpmnWorkflow ) -> Optional[SpiffTask]: """Get_task_by_id.""" all_tasks = bpmn_process_instance.get_tasks(TaskState.ANY_MASK)