diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py index 7711c36f9..eaf67f6c9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py @@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None: if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None: database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}" if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" else: # use pswd to trick flake8 with hardcoded passwords db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index 009a7486e..44fe82764 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -127,9 +127,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): def serialized_with_metadata(self) -> dict[str, Any]: process_instance_attributes = self.serialized process_instance_attributes["process_metadata"] = self.process_metadata - process_instance_attributes["process_model_with_diagram_identifier"] = ( - self.process_model_with_diagram_identifier - ) + process_instance_attributes[ + "process_model_with_diagram_identifier" + ] = self.process_model_with_diagram_identifier return process_instance_attributes @property diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index ff5900c67..3452efdd4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -20,6 +20,7 @@ from flask import make_response from flask import stream_with_context from flask.wrappers import Response from jinja2 import TemplateSyntaxError +from SpiffWorkflow.bpmn.workflow import BpmnWorkflow from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState @@ -385,46 +386,65 @@ def _render_instructions_for_end_user(task_model: TaskModel, extensions: Optiona def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[str, Optional[str], None]: processor = ProcessInstanceProcessor(process_instance) - reported_ids = [] # bit of an issue with end tasks showing as getting completed twice. - spiff_task = processor.next_task() - task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() - last_task = None - while last_task != spiff_task: - task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task()) - extensions = TaskService.get_extensions_from_task_model(task_model) - instructions = _render_instructions_for_end_user(task_model, extensions) - if instructions and spiff_task.id not in reported_ids: - reported_ids.append(spiff_task.id) - task.properties = extensions - yield f"data: {current_app.json.dumps(task)} \n\n" - last_task = spiff_task - try: - processor.do_engine_steps(execution_strategy_name="one_at_a_time") - processor.do_engine_steps(execution_strategy_name="run_until_user_message") - processor.save() # Fixme - maybe find a way not to do this on every loop? - except WorkflowTaskException as wfe: - api_error = ApiError.from_workflow_exception( - "engine_steps_error", "Failed complete an automated task.", exp=wfe - ) - yield f"data: {current_app.json.dumps(api_error)} \n\n" - except Exception as e: - api_error = ApiError( - error_code="engine_steps_error", - message=f"Failed complete an automated task. Error was: {str(e)}", - status_code=400, - ) - yield f"data: {current_app.json.dumps(api_error)} \n\n" + reported_ids = [] # A list of all the ids reported by this endpoint so far. + def get_reportable_tasks(): + return processor.bpmn_process_instance.get_tasks( + TaskState.WAITING | TaskState.STARTED | TaskState.READY | TaskState.ERROR + ) + + tasks = get_reportable_tasks() + while True: + for spiff_task in tasks: + task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() + extensions = TaskService.get_extensions_from_task_model(task_model) + instructions = _render_instructions_for_end_user(task_model, extensions) + if instructions and spiff_task.id not in reported_ids: + task = ProcessInstanceService.spiff_task_to_api_task(processor, spiff_task) + task.properties = extensions + yield f"data: {current_app.json.dumps(task)} \n\n" + reported_ids.append(spiff_task.id) + if spiff_task.state == TaskState.READY: + try: + processor.do_engine_steps(execution_strategy_name="run_until_user_message") + processor.save() # Fixme - maybe find a way not to do this on every loop? + except WorkflowTaskException as wfe: + api_error = ApiError.from_workflow_exception( + "engine_steps_error", "Failed complete an automated task.", exp=wfe + ) + yield f"data: {current_app.json.dumps(api_error)} \n\n" + return + except Exception as e: + api_error = ApiError( + error_code="engine_steps_error", + message=f"Failed complete an automated task. Error was: {str(e)}", + status_code=400, + ) + yield f"data: {current_app.json.dumps(api_error)} \n\n" + return + processor.bpmn_process_instance.refresh_waiting_tasks() + ready_engine_task_count = get_ready_engine_step_count(processor.bpmn_process_instance) + if ready_engine_task_count == 0: + break # No more tasks to report + + task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task()) + if task.id not in reported_ids: + yield f"data: {current_app.json.dumps(task)} \n\n" # Note, this has to be done in case someone leaves the page, # which can otherwise cancel this function and leave completed tasks un-registered. spiff_task = processor.next_task() task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() - # Always provide some response, in the event no instructions were provided. - if len(reported_ids) == 0: - task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task()) - yield f"data: {current_app.json.dumps(task)} \n\n" - +def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow): + return len( + list( + [ + t + for t in bpmn_process_instance.get_tasks(TaskState.READY) + if bpmn_process_instance._is_engine_task(t.task_spec) + ] + ) + ) def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str, Optional[str], None]: process_instance = _find_process_instance_by_id_or_raise(process_instance_id) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 45d27509c..017d9e79c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -423,9 +423,9 @@ class ProcessInstanceProcessor: tld.process_instance_id = process_instance_model.id # we want this to be the fully qualified path to the process model including all group subcomponents - current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( - f"{process_instance_model.process_model_identifier}" - ) + current_app.config[ + "THREAD_LOCAL_DATA" + ].process_model_identifier = f"{process_instance_model.process_model_identifier}" self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() @@ -585,9 +585,9 @@ class ProcessInstanceProcessor: bpmn_subprocess_definition.bpmn_identifier ] = bpmn_process_definition_dict spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} - bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( - bpmn_subprocess_definition.bpmn_identifier - ) + bpmn_subprocess_definition_bpmn_identifiers[ + bpmn_subprocess_definition.id + ] = bpmn_subprocess_definition.bpmn_identifier task_definitions = TaskDefinitionModel.query.filter( TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 36fad80ae..14441475e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -220,14 +220,15 @@ class TaskService: if task_model.state == "COMPLETED": event_type = ProcessInstanceEventType.task_completed.value timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() - process_instance_event, _process_instance_error_detail = ( - ProcessInstanceTmpService.add_event_to_process_instance( - self.process_instance, - event_type, - task_guid=task_model.guid, - timestamp=timestamp, - add_to_db_session=False, - ) + ( + process_instance_event, + _process_instance_error_detail, + ) = ProcessInstanceTmpService.add_event_to_process_instance( + self.process_instance, + event_type, + task_guid=task_model.guid, + timestamp=timestamp, + add_to_db_session=False, ) self.process_instance_events[task_model.guid] = process_instance_event @@ -454,7 +455,6 @@ class TaskService: spiff_task, self.bpmn_definition_to_task_definitions_mappings, ) - self.update_task_model(task_model, spiff_task) self.task_models[task_model.guid] = task_model diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 91f362a10..859e8aede 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -299,28 +299,33 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy): - """When you want to run tasks until you hit something to report to the end user.""" + """When you want to run tasks until you hit something to report to the end user. - def get_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: - return list( - [ - t - for t in bpmn_process_instance.get_tasks(TaskState.READY) - if t.task_spec.spec_type not in ["User Task", "Manual Task"] - and not ( - hasattr(t.task_spec, "extensions") and t.task_spec.extensions.get("instructionsForEndUser", None) - ) - ] - ) + Note that this will run at least one engine step if possible, + but will stop if it hits instructions after the first task. + """ def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - engine_steps = self.get_engine_steps(bpmn_process_instance) + bpmn_process_instance.refresh_waiting_tasks() + engine_steps = self.get_ready_engine_steps(bpmn_process_instance) + if len(engine_steps) > 0: + self.delegate.will_complete_task(engine_steps[0]) + engine_steps[0].run() + self.delegate.did_complete_task(engine_steps[0]) + bpmn_process_instance.refresh_waiting_tasks() + + engine_steps = self.get_ready_engine_steps(bpmn_process_instance) while engine_steps: for task in engine_steps: + if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get( + "instructionsForEndUser", None + ): + should_continue = False + break self.delegate.will_complete_task(task) task.run() self.delegate.did_complete_task(task) - engine_steps = self.get_engine_steps(bpmn_process_instance) + engine_steps = self.get_ready_engine_steps(bpmn_process_instance) self.delegate.after_engine_steps(bpmn_process_instance) diff --git a/spiffworkflow-frontend/src/components/ProcessInstanceListTable.tsx b/spiffworkflow-frontend/src/components/ProcessInstanceListTable.tsx index ccd620756..778c1713d 100644 --- a/spiffworkflow-frontend/src/components/ProcessInstanceListTable.tsx +++ b/spiffworkflow-frontend/src/components/ProcessInstanceListTable.tsx @@ -1408,6 +1408,7 @@ export default function ProcessInstanceListTable({ ); }; + // eslint-disable-next-line sonarjs/cognitive-complexity const buildTable = () => { const headerLabels: Record = { id: 'Id', @@ -1449,11 +1450,11 @@ export default function ProcessInstanceListTable({ buttonElement = ( ); currentRow.push({buttonElement});