From 385e8cea2bd3dcf7d9eeb1520bf81ef8b6b66c30 Mon Sep 17 00:00:00 2001 From: jasquat Date: Tue, 29 Nov 2022 15:59:46 -0500 Subject: [PATCH] WIP more metadata reporting w/ burnettk --- spiffworkflow-backend/: | 1908 +++++++++++++++++ .../routes/process_api_blueprint.py | 49 +- .../services/authentication_service.py | 5 +- .../process_instance_report_service.py | 19 + .../integration/test_process_api.py | 7 +- 5 files changed, 1977 insertions(+), 11 deletions(-) create mode 100644 spiffworkflow-backend/: diff --git a/spiffworkflow-backend/: b/spiffworkflow-backend/: new file mode 100644 index 00000000..5516fdae --- /dev/null +++ b/spiffworkflow-backend/: @@ -0,0 +1,1908 @@ +"""APIs for dealing with process groups, process models, and process instances.""" +import json +import random +import string +import uuid +from typing import Any +from typing import Dict +from typing import Optional +from typing import TypedDict +from typing import Union + +import connexion # type: ignore +import flask.wrappers +import jinja2 +from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel +import werkzeug +from flask import Blueprint +from flask import current_app +from flask import g +from flask import jsonify +from flask import make_response +from flask import redirect +from flask import request +from flask.wrappers import Response +from flask_bpmn.api.api_error import ApiError +from flask_bpmn.models.db import db +from lxml import etree # type: ignore +from lxml.builder import ElementMaker # type: ignore +from SpiffWorkflow.task import Task as SpiffTask # type: ignore +from SpiffWorkflow.task import TaskState +from sqlalchemy import and_ +from sqlalchemy import asc +from sqlalchemy import desc +from sqlalchemy.orm import aliased, joinedload + +from spiffworkflow_backend.exceptions.process_entity_not_found_error import ( + ProcessEntityNotFoundError, +) +from spiffworkflow_backend.models.active_task import ActiveTaskModel +from spiffworkflow_backend.models.active_task_user import ActiveTaskUserModel +from spiffworkflow_backend.models.file import FileSchema +from spiffworkflow_backend.models.group import GroupModel +from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel +from spiffworkflow_backend.models.message_instance import MessageInstanceModel +from spiffworkflow_backend.models.message_model import MessageModel +from spiffworkflow_backend.models.message_triggerable_process_model import ( + MessageTriggerableProcessModel, +) +from spiffworkflow_backend.models.principal import PrincipalModel +from spiffworkflow_backend.models.process_group import ProcessGroup +from spiffworkflow_backend.models.process_group import ProcessGroupSchema +from spiffworkflow_backend.models.process_instance import ProcessInstanceApiSchema +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema +from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_report import ( + ProcessInstanceReportModel, +) +from spiffworkflow_backend.models.process_model import ProcessModelInfo +from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema +from spiffworkflow_backend.models.secret_model import SecretModel +from spiffworkflow_backend.models.secret_model import SecretModelSchema +from spiffworkflow_backend.models.spec_reference import SpecReferenceCache +from spiffworkflow_backend.models.spec_reference import SpecReferenceSchema +from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel +from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel +from spiffworkflow_backend.models.user import UserModel +from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel +from spiffworkflow_backend.routes.user import verify_token +from spiffworkflow_backend.services.authorization_service import AuthorizationService +from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService +from spiffworkflow_backend.services.git_service import GitService +from spiffworkflow_backend.services.message_service import MessageService +from spiffworkflow_backend.services.process_instance_processor import ( + ProcessInstanceProcessor, +) +from spiffworkflow_backend.services.process_instance_report_service import ( + ProcessInstanceReportFilter, +) +from spiffworkflow_backend.services.process_instance_report_service import ( + ProcessInstanceReportService, +) +from spiffworkflow_backend.services.process_instance_service import ( + ProcessInstanceService, +) +from spiffworkflow_backend.services.process_model_service import ProcessModelService +from spiffworkflow_backend.services.script_unit_test_runner import ScriptUnitTestRunner +from spiffworkflow_backend.services.secret_service import SecretService +from spiffworkflow_backend.services.service_task_service import ServiceTaskService +from spiffworkflow_backend.services.spec_file_service import SpecFileService +from spiffworkflow_backend.services.user_service import UserService + + +class TaskDataSelectOption(TypedDict): + """TaskDataSelectOption.""" + + value: str + label: str + + +class ReactJsonSchemaSelectOption(TypedDict): + """ReactJsonSchemaSelectOption.""" + + type: str + title: str + enum: list[str] + + +process_api_blueprint = Blueprint("process_api", __name__) + + +def status() -> flask.wrappers.Response: + """Status.""" + ProcessInstanceModel.query.filter().first() + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def permissions_check(body: Dict[str, Dict[str, list[str]]]) -> flask.wrappers.Response: + """Permissions_check.""" + if "requests_to_check" not in body: + raise ( + ApiError( + error_code="could_not_requests_to_check", + message="The key 'requests_to_check' not found at root of request body.", + status_code=400, + ) + ) + + response_dict: dict[str, dict[str, bool]] = {} + requests_to_check = body["requests_to_check"] + + for target_uri, http_methods in requests_to_check.items(): + if target_uri not in response_dict: + response_dict[target_uri] = {} + + for http_method in http_methods: + permission_string = AuthorizationService.get_permission_from_http_method( + http_method + ) + if permission_string: + has_permission = AuthorizationService.user_has_permission( + user=g.user, + permission=permission_string, + target_uri=target_uri, + ) + response_dict[target_uri][http_method] = has_permission + + return make_response(jsonify({"results": response_dict}), 200) + + +def modify_process_model_id(process_model_id: str) -> str: + """Modify_process_model_id.""" + return process_model_id.replace("/", ":") + + +def un_modify_modified_process_model_id(modified_process_model_id: str) -> str: + """Un_modify_modified_process_model_id.""" + return modified_process_model_id.replace(":", "/") + + +def process_group_add(body: dict) -> flask.wrappers.Response: + """Add_process_group.""" + process_group = ProcessGroup(**body) + ProcessModelService.add_process_group(process_group) + return make_response(jsonify(process_group), 201) + + +def process_group_delete(modified_process_group_id: str) -> flask.wrappers.Response: + """Process_group_delete.""" + process_group_id = un_modify_modified_process_model_id(modified_process_group_id) + ProcessModelService().process_group_delete(process_group_id) + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_group_update( + modified_process_group_id: str, body: dict +) -> flask.wrappers.Response: + """Process Group Update.""" + body_include_list = ["display_name", "description"] + body_filtered = { + include_item: body[include_item] + for include_item in body_include_list + if include_item in body + } + + process_group_id = un_modify_modified_process_model_id(modified_process_group_id) + process_group = ProcessGroup(id=process_group_id, **body_filtered) + ProcessModelService.update_process_group(process_group) + return make_response(jsonify(process_group), 200) + + +def process_group_list( + process_group_identifier: Optional[str] = None, page: int = 1, per_page: int = 100 +) -> flask.wrappers.Response: + """Process_group_list.""" + if process_group_identifier is not None: + process_groups = ProcessModelService.get_process_groups( + process_group_identifier + ) + else: + process_groups = ProcessModelService.get_process_groups() + batch = ProcessModelService().get_batch( + items=process_groups, page=page, per_page=per_page + ) + pages = len(process_groups) // per_page + remainder = len(process_groups) % per_page + if remainder > 0: + pages += 1 + + response_json = { + "results": ProcessGroupSchema(many=True).dump(batch), + "pagination": { + "count": len(batch), + "total": len(process_groups), + "pages": pages, + }, + } + return Response(json.dumps(response_json), status=200, mimetype="application/json") + + +def process_group_show( + modified_process_group_id: str, +) -> Any: + """Process_group_show.""" + process_group_id = un_modify_modified_process_model_id(modified_process_group_id) + try: + process_group = ProcessModelService.get_process_group(process_group_id) + except ProcessEntityNotFoundError as exception: + raise ( + ApiError( + error_code="process_group_cannot_be_found", + message=f"Process group cannot be found: {process_group_id}", + status_code=400, + ) + ) from exception + + process_group.parent_groups = ProcessModelService.get_parent_group_array( + process_group.id + ) + return make_response(jsonify(process_group), 200) + + +def process_group_move( + modified_process_group_identifier: str, new_location: str +) -> flask.wrappers.Response: + """Process_group_move.""" + original_process_group_id = un_modify_modified_process_model_id( + modified_process_group_identifier + ) + new_process_group = ProcessModelService().process_group_move( + original_process_group_id, new_location + ) + return make_response(jsonify(new_process_group), 201) + + +def process_model_create( + modified_process_group_id: str, body: Dict[str, Union[str, bool, int]] +) -> flask.wrappers.Response: + """Process_model_create.""" + process_model_info = ProcessModelInfoSchema().load(body) + if modified_process_group_id is None: + raise ApiError( + error_code="process_group_id_not_specified", + message="Process Model could not be created when process_group_id path param is unspecified", + status_code=400, + ) + if process_model_info is None: + raise ApiError( + error_code="process_model_could_not_be_created", + message=f"Process Model could not be created from given body: {body}", + status_code=400, + ) + + unmodified_process_group_id = un_modify_modified_process_model_id( + modified_process_group_id + ) + process_group = ProcessModelService.get_process_group(unmodified_process_group_id) + if process_group is None: + raise ApiError( + error_code="process_model_could_not_be_created", + message=f"Process Model could not be created from given body because Process Group could not be found: {body}", + status_code=400, + ) + + ProcessModelService.add_process_model(process_model_info) + return Response( + json.dumps(ProcessModelInfoSchema().dump(process_model_info)), + status=201, + mimetype="application/json", + ) + + +def process_model_delete( + modified_process_model_identifier: str, +) -> flask.wrappers.Response: + """Process_model_delete.""" + process_model_identifier = modified_process_model_identifier.replace(":", "/") + # process_model_identifier = f"{process_group_id}/{process_model_id}" + ProcessModelService().process_model_delete(process_model_identifier) + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_model_update( + modified_process_model_identifier: str, body: Dict[str, Union[str, bool, int]] +) -> Any: + """Process_model_update.""" + process_model_identifier = modified_process_model_identifier.replace(":", "/") + body_include_list = [ + "display_name", + "primary_file_name", + "primary_process_id", + "description", + ] + body_filtered = { + include_item: body[include_item] + for include_item in body_include_list + if include_item in body + } + + # process_model_identifier = f"{process_group_id}/{process_model_id}" + process_model = get_process_model(process_model_identifier) + ProcessModelService.update_process_model(process_model, body_filtered) + return ProcessModelInfoSchema().dump(process_model) + + +def process_model_show(modified_process_model_identifier: str) -> Any: + """Process_model_show.""" + process_model_identifier = modified_process_model_identifier.replace(":", "/") + # process_model_identifier = f"{process_group_id}/{process_model_id}" + process_model = get_process_model(process_model_identifier) + # TODO: Temporary. Should not need the next line once models have correct ids + # process_model.id = process_model_identifier + files = sorted(SpecFileService.get_files(process_model)) + process_model.files = files + for file in process_model.files: + file.references = SpecFileService.get_references_for_file(file, process_model) + + process_model.parent_groups = ProcessModelService.get_parent_group_array( + process_model.id + ) + return make_response(jsonify(process_model), 200) + + +def process_model_move( + modified_process_model_identifier: str, new_location: str +) -> flask.wrappers.Response: + """Process_model_move.""" + original_process_model_id = un_modify_modified_process_model_id( + modified_process_model_identifier + ) + new_process_model = ProcessModelService().process_model_move( + original_process_model_id, new_location + ) + return make_response(jsonify(new_process_model), 201) + + +def process_model_list( + process_group_identifier: Optional[str] = None, + recursive: Optional[bool] = False, + filter_runnable_by_user: Optional[bool] = False, + page: int = 1, + per_page: int = 100, +) -> flask.wrappers.Response: + """Process model list!""" + process_models = ProcessModelService.get_process_models( + process_group_id=process_group_identifier, + recursive=recursive, + filter_runnable_by_user=filter_runnable_by_user, + ) + batch = ProcessModelService().get_batch( + process_models, page=page, per_page=per_page + ) + pages = len(process_models) // per_page + remainder = len(process_models) % per_page + if remainder > 0: + pages += 1 + response_json = { + "results": ProcessModelInfoSchema(many=True).dump(batch), + "pagination": { + "count": len(batch), + "total": len(process_models), + "pages": pages, + }, + } + return Response(json.dumps(response_json), status=200, mimetype="application/json") + + +def process_list() -> Any: + """Returns a list of all known processes. + + This includes processes that are not the + primary process - helpful for finding possible call activities. + """ + references = SpecReferenceCache.query.filter_by(type="process").all() + return SpecReferenceSchema(many=True).dump(references) + + +def get_file(modified_process_model_id: str, file_name: str) -> Any: + """Get_file.""" + process_model_identifier = modified_process_model_id.replace(":", "/") + process_model = get_process_model(process_model_identifier) + files = SpecFileService.get_files(process_model, file_name) + if len(files) == 0: + raise ApiError( + error_code="unknown file", + message=f"No information exists for file {file_name}" + f" it does not exist in workflow {process_model_identifier}.", + status_code=404, + ) + + file = files[0] + file_contents = SpecFileService.get_data(process_model, file.name) + file.file_contents = file_contents + file.process_model_id = process_model.id + # file.process_group_id = process_model.process_group_id + return FileSchema().dump(file) + + +def process_model_file_update( + modified_process_model_id: str, file_name: str +) -> flask.wrappers.Response: + """Process_model_file_update.""" + process_model_identifier = modified_process_model_id.replace(":", "/") + # process_model_identifier = f"{process_group_id}/{process_model_id}" + process_model = get_process_model(process_model_identifier) + + request_file = get_file_from_request() + request_file_contents = request_file.stream.read() + if not request_file_contents: + raise ApiError( + error_code="file_contents_empty", + message="Given request file does not have any content", + status_code=400, + ) + + SpecFileService.update_file(process_model, file_name, request_file_contents) + + if current_app.config["GIT_COMMIT_ON_SAVE"]: + git_output = GitService.commit( + message=f"User: {g.user.username} clicked save for {process_model_identifier}/{file_name}" + ) + current_app.logger.info(f"git output: {git_output}") + else: + current_app.logger.info("Git commit on save is disabled") + + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_model_file_delete( + modified_process_model_id: str, file_name: str +) -> flask.wrappers.Response: + """Process_model_file_delete.""" + process_model_identifier = modified_process_model_id.replace(":", "/") + process_model = get_process_model(process_model_identifier) + try: + SpecFileService.delete_file(process_model, file_name) + except FileNotFoundError as exception: + raise ( + ApiError( + error_code="process_model_file_cannot_be_found", + message=f"Process model file cannot be found: {file_name}", + status_code=400, + ) + ) from exception + + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def add_file(modified_process_model_id: str) -> flask.wrappers.Response: + """Add_file.""" + process_model_identifier = modified_process_model_id.replace(":", "/") + process_model = get_process_model(process_model_identifier) + request_file = get_file_from_request() + if not request_file.filename: + raise ApiError( + error_code="could_not_get_filename", + message="Could not get filename from request", + status_code=400, + ) + + file = SpecFileService.add_file( + process_model, request_file.filename, request_file.stream.read() + ) + file_contents = SpecFileService.get_data(process_model, file.name) + file.file_contents = file_contents + file.process_model_id = process_model.id + return Response( + json.dumps(FileSchema().dump(file)), status=201, mimetype="application/json" + ) + + +def process_instance_create(modified_process_model_id: str) -> flask.wrappers.Response: + """Create_process_instance.""" + process_model_identifier = un_modify_modified_process_model_id( + modified_process_model_id + ) + process_instance = ( + ProcessInstanceService.create_process_instance_from_process_model_identifier( + process_model_identifier, g.user + ) + ) + return Response( + json.dumps(ProcessInstanceModelSchema().dump(process_instance)), + status=201, + mimetype="application/json", + ) + + +def process_instance_run( + modified_process_model_identifier: str, + process_instance_id: int, + do_engine_steps: bool = True, +) -> flask.wrappers.Response: + """Process_instance_run.""" + process_instance = ProcessInstanceService().get_process_instance( + process_instance_id + ) + processor = ProcessInstanceProcessor(process_instance) + + if do_engine_steps: + try: + processor.do_engine_steps() + except ApiError as e: + ErrorHandlingService().handle_error(processor, e) + raise e + except Exception as e: + ErrorHandlingService().handle_error(processor, e) + task = processor.bpmn_process_instance.last_task + raise ApiError.from_task( + error_code="unknown_exception", + message=f"An unknown error occurred. Original error: {e}", + status_code=400, + task=task, + ) from e + processor.save() + + if not current_app.config["RUN_BACKGROUND_SCHEDULER"]: + MessageService.process_message_instances() + + process_instance_api = ProcessInstanceService.processor_to_process_instance_api( + processor + ) + process_instance_data = processor.get_data() + process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api) + process_instance_metadata["data"] = process_instance_data + return Response( + json.dumps(process_instance_metadata), status=200, mimetype="application/json" + ) + + +def process_instance_terminate( + process_instance_id: int, +) -> flask.wrappers.Response: + """Process_instance_run.""" + process_instance = ProcessInstanceService().get_process_instance( + process_instance_id + ) + processor = ProcessInstanceProcessor(process_instance) + processor.terminate() + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_instance_suspend( + process_instance_id: int, +) -> flask.wrappers.Response: + """Process_instance_suspend.""" + process_instance = ProcessInstanceService().get_process_instance( + process_instance_id + ) + processor = ProcessInstanceProcessor(process_instance) + processor.suspend() + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_instance_resume( + process_instance_id: int, +) -> flask.wrappers.Response: + """Process_instance_resume.""" + process_instance = ProcessInstanceService().get_process_instance( + process_instance_id + ) + processor = ProcessInstanceProcessor(process_instance) + processor.resume() + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_instance_log_list( + process_instance_id: int, + page: int = 1, + per_page: int = 100, +) -> flask.wrappers.Response: + """Process_instance_log_list.""" + # to make sure the process instance exists + process_instance = find_process_instance_by_id_or_raise(process_instance_id) + + logs = ( + SpiffLoggingModel.query.filter( + SpiffLoggingModel.process_instance_id == process_instance.id + ) + .order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore + .join( + UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True + ) # isouter since if we don't have a user, we still want the log + .add_columns( + UserModel.username, + ) + .paginate(page=page, per_page=per_page, error_out=False) + ) + + response_json = { + "results": logs.items, + "pagination": { + "count": len(logs.items), + "total": logs.total, + "pages": logs.pages, + }, + } + + return make_response(jsonify(response_json), 200) + + +def message_instance_list( + process_instance_id: Optional[int] = None, + page: int = 1, + per_page: int = 100, +) -> flask.wrappers.Response: + """Message_instance_list.""" + # to make sure the process instance exists + message_instances_query = MessageInstanceModel.query + + if process_instance_id: + message_instances_query = message_instances_query.filter_by( + process_instance_id=process_instance_id + ) + + message_instances = ( + message_instances_query.order_by( + MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore + MessageInstanceModel.id.desc(), # type: ignore + ) + .join(MessageModel, MessageModel.id == MessageInstanceModel.message_model_id) + .join(ProcessInstanceModel) + .add_columns( + MessageModel.identifier.label("message_identifier"), + ProcessInstanceModel.process_model_identifier, + ) + .paginate(page=page, per_page=per_page, error_out=False) + ) + + for message_instance in message_instances: + message_correlations: dict = {} + for ( + mcmi + ) in ( + message_instance.MessageInstanceModel.message_correlations_message_instances + ): + mc = MessageCorrelationModel.query.filter_by( + id=mcmi.message_correlation_id + ).all() + for m in mc: + if m.name not in message_correlations: + message_correlations[m.name] = {} + message_correlations[m.name][ + m.message_correlation_property.identifier + ] = m.value + message_instance.MessageInstanceModel.message_correlations = ( + message_correlations + ) + + response_json = { + "results": message_instances.items, + "pagination": { + "count": len(message_instances.items), + "total": message_instances.total, + "pages": message_instances.pages, + }, + } + + return make_response(jsonify(response_json), 200) + + +# body: { +# payload: dict, +# process_instance_id: Optional[int], +# } +def message_start( + message_identifier: str, + body: Dict[str, Any], +) -> flask.wrappers.Response: + """Message_start.""" + message_model = MessageModel.query.filter_by(identifier=message_identifier).first() + if message_model is None: + raise ( + ApiError( + error_code="unknown_message", + message=f"Could not find message with identifier: {message_identifier}", + status_code=404, + ) + ) + + if "payload" not in body: + raise ( + ApiError( + error_code="missing_payload", + message="Body is missing payload.", + status_code=400, + ) + ) + + process_instance = None + if "process_instance_id" in body: + # to make sure we have a valid process_instance_id + process_instance = find_process_instance_by_id_or_raise( + body["process_instance_id"] + ) + + message_instance = MessageInstanceModel.query.filter_by( + process_instance_id=process_instance.id, + message_model_id=message_model.id, + message_type="receive", + status="ready", + ).first() + if message_instance is None: + raise ( + ApiError( + error_code="cannot_find_waiting_message", + message=f"Could not find waiting message for identifier {message_identifier} " + f"and process instance {process_instance.id}", + status_code=400, + ) + ) + MessageService.process_message_receive( + message_instance, message_model.name, body["payload"] + ) + + else: + message_triggerable_process_model = ( + MessageTriggerableProcessModel.query.filter_by( + message_model_id=message_model.id + ).first() + ) + + if message_triggerable_process_model is None: + raise ( + ApiError( + error_code="cannot_start_message", + message=f"Message with identifier cannot be start with message: {message_identifier}", + status_code=400, + ) + ) + + process_instance = MessageService.process_message_triggerable_process_model( + message_triggerable_process_model, + message_model.name, + body["payload"], + g.user, + ) + + return Response( + json.dumps(ProcessInstanceModelSchema().dump(process_instance)), + status=200, + mimetype="application/json", + ) + + +def process_instance_list( + process_model_identifier: Optional[str] = None, + page: int = 1, + per_page: int = 100, + start_from: Optional[int] = None, + start_to: Optional[int] = None, + end_from: Optional[int] = None, + end_to: Optional[int] = None, + process_status: Optional[str] = None, + initiated_by_me: Optional[bool] = None, + with_tasks_completed_by_me: Optional[bool] = None, + with_tasks_completed_by_my_group: Optional[bool] = None, + user_filter: Optional[bool] = False, + report_identifier: Optional[str] = None, +) -> flask.wrappers.Response: + """Process_instance_list.""" + process_instance_report = ProcessInstanceReportService.report_with_identifier( + g.user, report_identifier + ) + + if user_filter: + report_filter = ProcessInstanceReportFilter( + process_model_identifier, + start_from, + start_to, + end_from, + end_to, + process_status.split(",") if process_status else None, + initiated_by_me, + with_tasks_completed_by_me, + with_tasks_completed_by_my_group, + ) + else: + report_filter = ( + ProcessInstanceReportService.filter_from_metadata_with_overrides( + process_instance_report, + process_model_identifier, + start_from, + start_to, + end_from, + end_to, + process_status, + initiated_by_me, + with_tasks_completed_by_me, + with_tasks_completed_by_my_group, + ) + ) + + # process_model_identifier = un_modify_modified_process_model_id(modified_process_model_identifier) + process_instance_query = ProcessInstanceModel.query + # Always join that hot user table for good performance at serialization time. + process_instance_query = process_instance_query.options( + joinedload(ProcessInstanceModel.process_initiator) + ) + + if report_filter.process_model_identifier is not None: + process_model = get_process_model( + f"{report_filter.process_model_identifier}", + ) + + process_instance_query = process_instance_query.filter_by( + process_model_identifier=process_model.id + ) + + # this can never happen. obviously the class has the columns it defines. this is just to appease mypy. + if ( + ProcessInstanceModel.start_in_seconds is None + or ProcessInstanceModel.end_in_seconds is None + ): + raise ( + ApiError( + error_code="unexpected_condition", + message="Something went very wrong", + status_code=500, + ) + ) + + if report_filter.start_from is not None: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.start_in_seconds >= report_filter.start_from + ) + if report_filter.start_to is not None: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.start_in_seconds <= report_filter.start_to + ) + if report_filter.end_from is not None: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.end_in_seconds >= report_filter.end_from + ) + if report_filter.end_to is not None: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.end_in_seconds <= report_filter.end_to + ) + if report_filter.process_status is not None: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.status.in_(report_filter.process_status) # type: ignore + ) + + if report_filter.initiated_by_me is True: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.status.in_(["complete", "error", "terminated"]) # type: ignore + ) + process_instance_query = process_instance_query.filter_by( + process_initiator=g.user + ) + + # TODO: not sure if this is exactly what is wanted + if report_filter.with_tasks_completed_by_me is True: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.status.in_(["complete", "error", "terminated"]) # type: ignore + ) + # process_instance_query = process_instance_query.join(UserModel, UserModel.id == ProcessInstanceModel.process_initiator_id) + # process_instance_query = process_instance_query.add_columns(UserModel.username) + # search for process_instance.UserModel.username in this file for more details about why adding columns is annoying. + + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.process_initiator_id != g.user.id + ) + process_instance_query = process_instance_query.join( + SpiffStepDetailsModel, + ProcessInstanceModel.id == SpiffStepDetailsModel.process_instance_id, + ) + process_instance_query = process_instance_query.join( + SpiffLoggingModel, + ProcessInstanceModel.id == SpiffLoggingModel.process_instance_id, + ) + process_instance_query = process_instance_query.filter( + SpiffLoggingModel.message.contains("COMPLETED") # type: ignore + ) + process_instance_query = process_instance_query.filter( + SpiffLoggingModel.spiff_step == SpiffStepDetailsModel.spiff_step + ) + process_instance_query = process_instance_query.filter( + SpiffStepDetailsModel.completed_by_user_id == g.user.id + ) + + if report_filter.with_tasks_completed_by_my_group is True: + process_instance_query = process_instance_query.filter( + ProcessInstanceModel.status.in_(["complete", "error", "terminated"]) # type: ignore + ) + process_instance_query = process_instance_query.join( + SpiffStepDetailsModel, + ProcessInstanceModel.id == SpiffStepDetailsModel.process_instance_id, + ) + process_instance_query = process_instance_query.join( + SpiffLoggingModel, + ProcessInstanceModel.id == SpiffLoggingModel.process_instance_id, + ) + process_instance_query = process_instance_query.filter( + SpiffLoggingModel.message.contains("COMPLETED") # type: ignore + ) + process_instance_query = process_instance_query.filter( + SpiffLoggingModel.spiff_step == SpiffStepDetailsModel.spiff_step + ) + process_instance_query = process_instance_query.join( + GroupModel, + GroupModel.id == SpiffStepDetailsModel.lane_assignment_id, + ) + process_instance_query = process_instance_query.join( + UserGroupAssignmentModel, + UserGroupAssignmentModel.group_id == GroupModel.id, + ) + process_instance_query = process_instance_query.filter( + UserGroupAssignmentModel.user_id == g.user.id + ) + + # userSkillF = aliased(UserSkill) + # userSkillI = aliased(UserSkill) + + import pdb; pdb.set_trace() + for column in process_instance_report.report_metadata['columns']: + print(f"column: {column['accessor']}") + # process_instance_query = process_instance_query.outerjoin(ProcessInstanceMetadataModel, ProcessInstanceModel.id == ProcessInstanceMetadataModel.process_instance_id, ProcessInstanceMetadataModel.key == column['accessor']) + instance_metadata_alias = alias(ProcessInstanceMetadataModel) + process_instance_query = ( + process_instance_query.outerjoin(instance_metadata_alias, ProcessInstanceModel.id == instance_metadata_alias.process_instance_id) + .add_column(ProcessInstanceMetadataModel.value.label(column['accessor'])) + ) + import pdb; pdb.set_trace() + + process_instances = ( + process_instance_query.group_by(ProcessInstanceModel.id) + .order_by( + ProcessInstanceModel.start_in_seconds.desc(), ProcessInstanceModel.id.desc() # type: ignore + ) + .paginate(page=page, per_page=per_page, error_out=False) + ) + import pdb; pdb.set_trace() + + # def awesome_serialize(process_instance) + # dict_thing = process_instance.serialize + # + # # add columns since we have access to columns here + # dict_thing['awesome'] = 'awesome' + # + # return dict_thing + + results = list( + map( + ProcessInstanceService.serialize_flat_with_task_data, + process_instances.items, + ) + ) + report_metadata = process_instance_report.report_metadata + + response_json = { + "report_identifier": process_instance_report.identifier, + "report_metadata": report_metadata, + "results": results, + "filters": report_filter.to_dict(), + "pagination": { + "count": len(results), + "total": process_instances.total, + "pages": process_instances.pages, + }, + } + + return make_response(jsonify(response_json), 200) + + +def process_instance_show( + modified_process_model_identifier: str, process_instance_id: int +) -> flask.wrappers.Response: + """Create_process_instance.""" + process_model_identifier = modified_process_model_identifier.replace(":", "/") + process_instance = find_process_instance_by_id_or_raise(process_instance_id) + current_version_control_revision = GitService.get_current_revision() + process_model = get_process_model(process_model_identifier) + + if process_model.primary_file_name: + if ( + process_instance.bpmn_version_control_identifier + == current_version_control_revision + ): + bpmn_xml_file_contents = SpecFileService.get_data( + process_model, process_model.primary_file_name + ) + else: + bpmn_xml_file_contents = GitService.get_instance_file_contents_for_revision( + process_model, process_instance.bpmn_version_control_identifier + ) + process_instance.bpmn_xml_file_contents = bpmn_xml_file_contents + + return make_response(jsonify(process_instance), 200) + + +def process_instance_delete(process_instance_id: int) -> flask.wrappers.Response: + """Create_process_instance.""" + process_instance = find_process_instance_by_id_or_raise(process_instance_id) + + # (Pdb) db.session.delete + # > + db.session.query(SpiffLoggingModel).filter_by( + process_instance_id=process_instance.id + ).delete() + db.session.query(SpiffStepDetailsModel).filter_by( + process_instance_id=process_instance.id + ).delete() + db.session.delete(process_instance) + db.session.commit() + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_instance_report_list( + page: int = 1, per_page: int = 100 +) -> flask.wrappers.Response: + """Process_instance_report_list.""" + process_instance_reports = ProcessInstanceReportModel.query.filter_by( + created_by_id=g.user.id, + ).all() + + return make_response(jsonify(process_instance_reports), 200) + + +def process_instance_report_create(body: Dict[str, Any]) -> flask.wrappers.Response: + """Process_instance_report_create.""" + ProcessInstanceReportModel.create_report( + identifier=body["identifier"], + user=g.user, + report_metadata=body["report_metadata"], + ) + + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_instance_report_update( + report_identifier: str, + body: Dict[str, Any], +) -> flask.wrappers.Response: + """Process_instance_report_create.""" + process_instance_report = ProcessInstanceReportModel.query.filter_by( + identifier=report_identifier, + created_by_id=g.user.id, + ).first() + if process_instance_report is None: + raise ApiError( + error_code="unknown_process_instance_report", + message="Unknown process instance report", + status_code=404, + ) + + process_instance_report.report_metadata = body["report_metadata"] + db.session.commit() + + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def process_instance_report_delete( + report_identifier: str, +) -> flask.wrappers.Response: + """Process_instance_report_create.""" + process_instance_report = ProcessInstanceReportModel.query.filter_by( + identifier=report_identifier, + created_by_id=g.user.id, + ).first() + if process_instance_report is None: + raise ApiError( + error_code="unknown_process_instance_report", + message="Unknown process instance report", + status_code=404, + ) + + db.session.delete(process_instance_report) + db.session.commit() + + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def service_tasks_show() -> flask.wrappers.Response: + """Service_tasks_show.""" + available_connectors = ServiceTaskService.available_connectors() + print(available_connectors) + + return Response( + json.dumps(available_connectors), status=200, mimetype="application/json" + ) + + +def authentication_list() -> flask.wrappers.Response: + """Authentication_list.""" + available_authentications = ServiceTaskService.authentication_list() + response_json = { + "results": available_authentications, + "connector_proxy_base_url": current_app.config["CONNECTOR_PROXY_URL"], + "redirect_url": f"{current_app.config['SPIFFWORKFLOW_BACKEND_URL']}/v1.0/authentication_callback", + } + + return Response(json.dumps(response_json), status=200, mimetype="application/json") + + +def authentication_callback( + service: str, + auth_method: str, +) -> werkzeug.wrappers.Response: + """Authentication_callback.""" + verify_token(request.args.get("token"), force_run=True) + response = request.args["response"] + SecretService().update_secret( + f"{service}/{auth_method}", response, g.user.id, create_if_not_exists=True + ) + return redirect( + f"{current_app.config['SPIFFWORKFLOW_FRONTEND_URL']}/admin/configuration" + ) + + +def process_instance_report_show( + report_identifier: str, + page: int = 1, + per_page: int = 100, +) -> flask.wrappers.Response: + """Process_instance_list.""" + process_instances = ProcessInstanceModel.query.order_by( # .filter_by(process_model_identifier=process_model.id) + ProcessInstanceModel.start_in_seconds.desc(), ProcessInstanceModel.id.desc() # type: ignore + ).paginate( + page=page, per_page=per_page, error_out=False + ) + + process_instance_report = ProcessInstanceReportModel.query.filter_by( + identifier=report_identifier, + created_by_id=g.user.id, + ).first() + if process_instance_report is None: + raise ApiError( + error_code="unknown_process_instance_report", + message="Unknown process instance report", + status_code=404, + ) + + substitution_variables = request.args.to_dict() + result_dict = process_instance_report.generate_report( + process_instances.items, substitution_variables + ) + + # update this if we go back to a database query instead of filtering in memory + result_dict["pagination"] = { + "count": len(result_dict["results"]), + "total": len(result_dict["results"]), + "pages": 1, + } + + return Response(json.dumps(result_dict), status=200, mimetype="application/json") + + +# TODO: see comment for before_request +# @process_api_blueprint.route("/v1.0/tasks", methods=["GET"]) +def task_list_my_tasks(page: int = 1, per_page: int = 100) -> flask.wrappers.Response: + """Task_list_my_tasks.""" + principal = find_principal_or_raise() + active_tasks = ( + ActiveTaskModel.query.order_by(desc(ActiveTaskModel.id)) # type: ignore + .join(ProcessInstanceModel) + .join(ActiveTaskUserModel) + .filter_by(user_id=principal.user_id) + # just need this add_columns to add the process_model_identifier. Then add everything back that was removed. + .add_columns( + ProcessInstanceModel.process_model_identifier, + ProcessInstanceModel.process_model_display_name, + ProcessInstanceModel.status, + ActiveTaskModel.task_name, + ActiveTaskModel.task_title, + ActiveTaskModel.task_type, + ActiveTaskModel.task_status, + ActiveTaskModel.task_id, + ActiveTaskModel.id, + ActiveTaskModel.process_model_display_name, + ActiveTaskModel.process_instance_id, + ) + .paginate(page=page, per_page=per_page, error_out=False) + ) + tasks = [ActiveTaskModel.to_task(active_task) for active_task in active_tasks.items] + + response_json = { + "results": tasks, + "pagination": { + "count": len(active_tasks.items), + "total": active_tasks.total, + "pages": active_tasks.pages, + }, + } + + return make_response(jsonify(response_json), 200) + + +def task_list_for_my_open_processes( + page: int = 1, per_page: int = 100 +) -> flask.wrappers.Response: + """Task_list_for_my_open_processes.""" + return get_tasks(page=page, per_page=per_page) + + +def task_list_for_me(page: int = 1, per_page: int = 100) -> flask.wrappers.Response: + """Task_list_for_processes_started_by_others.""" + return get_tasks( + processes_started_by_user=False, + has_lane_assignment_id=False, + page=page, + per_page=per_page, + ) + + +def task_list_for_my_groups( + page: int = 1, per_page: int = 100 +) -> flask.wrappers.Response: + """Task_list_for_processes_started_by_others.""" + return get_tasks(processes_started_by_user=False, page=page, per_page=per_page) + + +def get_tasks( + processes_started_by_user: bool = True, + has_lane_assignment_id: bool = True, + page: int = 1, + per_page: int = 100, +) -> flask.wrappers.Response: + """Get_tasks.""" + user_id = g.user.id + + # use distinct to ensure we only get one row per active task otherwise + # we can get back multiple for the same active task row which throws off + # pagination later on + # https://stackoverflow.com/q/34582014/6090676 + active_tasks_query = ( + ActiveTaskModel.query.distinct() + .outerjoin(GroupModel, GroupModel.id == ActiveTaskModel.lane_assignment_id) + .join(ProcessInstanceModel) + .join(UserModel, UserModel.id == ProcessInstanceModel.process_initiator_id) + ) + + if processes_started_by_user: + active_tasks_query = active_tasks_query.filter( + ProcessInstanceModel.process_initiator_id == user_id + ).outerjoin( + ActiveTaskUserModel, + and_( + ActiveTaskUserModel.user_id == user_id, + ActiveTaskModel.id == ActiveTaskUserModel.active_task_id, + ), + ) + else: + active_tasks_query = active_tasks_query.filter( + ProcessInstanceModel.process_initiator_id != user_id + ).join( + ActiveTaskUserModel, + and_( + ActiveTaskUserModel.user_id == user_id, + ActiveTaskModel.id == ActiveTaskUserModel.active_task_id, + ), + ) + if has_lane_assignment_id: + active_tasks_query = active_tasks_query.filter( + ActiveTaskModel.lane_assignment_id.is_not(None) # type: ignore + ) + else: + active_tasks_query = active_tasks_query.filter(ActiveTaskModel.lane_assignment_id.is_(None)) # type: ignore + + active_tasks = active_tasks_query.add_columns( + ProcessInstanceModel.process_model_identifier, + ProcessInstanceModel.status.label("process_instance_status"), # type: ignore + ProcessInstanceModel.updated_at_in_seconds, + ProcessInstanceModel.created_at_in_seconds, + UserModel.username, + GroupModel.identifier.label("group_identifier"), + ActiveTaskModel.task_name, + ActiveTaskModel.task_title, + ActiveTaskModel.process_model_display_name, + ActiveTaskModel.process_instance_id, + ActiveTaskUserModel.user_id.label("current_user_is_potential_owner"), + ).paginate(page=page, per_page=per_page, error_out=False) + + response_json = { + "results": active_tasks.items, + "pagination": { + "count": len(active_tasks.items), + "total": active_tasks.total, + "pages": active_tasks.pages, + }, + } + return make_response(jsonify(response_json), 200) + + +def process_instance_task_list( + modified_process_model_id: str, + process_instance_id: int, + all_tasks: bool = False, + spiff_step: int = 0, +) -> flask.wrappers.Response: + """Process_instance_task_list.""" + process_instance = find_process_instance_by_id_or_raise(process_instance_id) + + if spiff_step > 0: + step_detail = ( + db.session.query(SpiffStepDetailsModel) + .filter( + SpiffStepDetailsModel.process_instance_id == process_instance.id, + SpiffStepDetailsModel.spiff_step == spiff_step, + ) + .first() + ) + if step_detail is not None and process_instance.bpmn_json is not None: + bpmn_json = json.loads(process_instance.bpmn_json) + bpmn_json["tasks"] = step_detail.task_json + process_instance.bpmn_json = json.dumps(bpmn_json) + + processor = ProcessInstanceProcessor(process_instance) + + spiff_tasks = None + if all_tasks: + spiff_tasks = processor.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) + else: + spiff_tasks = processor.get_all_user_tasks() + + tasks = [] + for spiff_task in spiff_tasks: + task = ProcessInstanceService.spiff_task_to_api_task(spiff_task) + task.data = spiff_task.data + tasks.append(task) + + return make_response(jsonify(tasks), 200) + + +def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response: + """Task_show.""" + process_instance = find_process_instance_by_id_or_raise(process_instance_id) + + if process_instance.status == ProcessInstanceStatus.suspended.value: + raise ApiError( + error_code="error_suspended", + message="The process instance is suspended", + status_code=400, + ) + + process_model = get_process_model( + process_instance.process_model_identifier, + ) + + form_schema_file_name = "" + form_ui_schema_file_name = "" + spiff_task = get_spiff_task_from_process_instance(task_id, process_instance) + extensions = spiff_task.task_spec.extensions + + if "properties" in extensions: + properties = extensions["properties"] + if "formJsonSchemaFilename" in properties: + form_schema_file_name = properties["formJsonSchemaFilename"] + if "formUiSchemaFilename" in properties: + form_ui_schema_file_name = properties["formUiSchemaFilename"] + task = ProcessInstanceService.spiff_task_to_api_task(spiff_task) + task.data = spiff_task.data + task.process_model_display_name = process_model.display_name + task.process_model_identifier = process_model.id + process_model_with_form = process_model + + if task.type == "User Task": + if not form_schema_file_name: + raise ( + ApiError( + error_code="missing_form_file", + message=f"Cannot find a form file for process_instance_id: {process_instance_id}, task_id: {task_id}", + status_code=400, + ) + ) + + form_contents = prepare_form_data( + form_schema_file_name, + task.data, + process_model_with_form, + ) + + try: + # form_contents is a str + form_dict = json.loads(form_contents) + except Exception as exception: + raise ( + ApiError( + error_code="error_loading_form", + message=f"Could not load form schema from: {form_schema_file_name}. Error was: {str(exception)}", + status_code=400, + ) + ) from exception + + if task.data: + _update_form_schema_with_task_data_as_needed(form_dict, task.data) + + if form_contents: + task.form_schema = form_dict + + if form_ui_schema_file_name: + ui_form_contents = prepare_form_data( + form_ui_schema_file_name, + task.data, + process_model_with_form, + ) + if ui_form_contents: + task.form_ui_schema = ui_form_contents + + if task.properties and task.data and "instructionsForEndUser" in task.properties: + print( + f"task.properties['instructionsForEndUser']: {task.properties['instructionsForEndUser']}" + ) + if task.properties["instructionsForEndUser"]: + task.properties["instructionsForEndUser"] = render_jinja_template( + task.properties["instructionsForEndUser"], task.data + ) + return make_response(jsonify(task), 200) + + +def task_submit( + process_instance_id: int, + task_id: str, + body: Dict[str, Any], + terminate_loop: bool = False, +) -> flask.wrappers.Response: + """Task_submit_user_data.""" + principal = find_principal_or_raise() + process_instance = find_process_instance_by_id_or_raise(process_instance_id) + + processor = ProcessInstanceProcessor(process_instance) + spiff_task = get_spiff_task_from_process_instance( + task_id, process_instance, processor=processor + ) + AuthorizationService.assert_user_can_complete_spiff_task( + process_instance.id, spiff_task, principal.user + ) + + if spiff_task.state != TaskState.READY: + raise ( + ApiError( + error_code="invalid_state", + message="You may not update a task unless it is in the READY state.", + status_code=400, + ) + ) + + if terminate_loop and spiff_task.is_looping(): + spiff_task.terminate_loop() + + active_task = ActiveTaskModel.query.filter_by( + process_instance_id=process_instance_id, task_id=task_id + ).first() + if active_task is None: + raise ( + ApiError( + error_code="no_active_task", + message="Cannot find an active task with task id '{task_id}' for process instance {process_instance_id}.", + status_code=500, + ) + ) + + ProcessInstanceService.complete_form_task( + processor=processor, + spiff_task=spiff_task, + data=body, + user=g.user, + active_task=active_task, + ) + + # If we need to update all tasks, then get the next ready task and if it a multi-instance with the same + # task spec, complete that form as well. + # if update_all: + # last_index = spiff_task.task_info()["mi_index"] + # next_task = processor.next_task() + # while next_task and next_task.task_info()["mi_index"] > last_index: + # __update_task(processor, next_task, form_data, user) + # last_index = next_task.task_info()["mi_index"] + # next_task = processor.next_task() + + next_active_task_assigned_to_me = ( + ActiveTaskModel.query.filter_by(process_instance_id=process_instance_id) + .order_by(asc(ActiveTaskModel.id)) # type: ignore + .join(ActiveTaskUserModel) + .filter_by(user_id=principal.user_id) + .first() + ) + if next_active_task_assigned_to_me: + return make_response( + jsonify(ActiveTaskModel.to_task(next_active_task_assigned_to_me)), 200 + ) + + return Response(json.dumps({"ok": True}), status=202, mimetype="application/json") + + +def script_unit_test_create( + process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]] +) -> flask.wrappers.Response: + """Script_unit_test_create.""" + bpmn_task_identifier = _get_required_parameter_or_raise( + "bpmn_task_identifier", body + ) + input_json = _get_required_parameter_or_raise("input_json", body) + expected_output_json = _get_required_parameter_or_raise( + "expected_output_json", body + ) + + process_model_identifier = f"{process_group_id}/{process_model_id}" + process_model = get_process_model(process_model_identifier) + file = SpecFileService.get_files(process_model, process_model.primary_file_name)[0] + if file is None: + raise ApiError( + error_code="cannot_find_file", + message=f"Could not find the primary bpmn file for process_model: {process_model.id}", + status_code=404, + ) + + # TODO: move this to an xml service or something + file_contents = SpecFileService.get_data(process_model, file.name) + bpmn_etree_element = etree.fromstring(file_contents) + + nsmap = bpmn_etree_element.nsmap + spiff_element_maker = ElementMaker( + namespace="http://spiffworkflow.org/bpmn/schema/1.0/core", nsmap=nsmap + ) + + script_task_elements = bpmn_etree_element.xpath( + f"//bpmn:scriptTask[@id='{bpmn_task_identifier}']", + namespaces={"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL"}, + ) + if len(script_task_elements) == 0: + raise ApiError( + error_code="missing_script_task", + message=f"Cannot find a script task with id: {bpmn_task_identifier}", + status_code=404, + ) + script_task_element = script_task_elements[0] + + extension_elements = None + extension_elements_array = script_task_element.xpath( + "//bpmn:extensionElements", + namespaces={"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL"}, + ) + if len(extension_elements_array) == 0: + bpmn_element_maker = ElementMaker( + namespace="http://www.omg.org/spec/BPMN/20100524/MODEL", nsmap=nsmap + ) + extension_elements = bpmn_element_maker("extensionElements") + script_task_element.append(extension_elements) + else: + extension_elements = extension_elements_array[0] + + unit_test_elements = None + unit_test_elements_array = extension_elements.xpath( + "//spiffworkflow:unitTests", + namespaces={"spiffworkflow": "http://spiffworkflow.org/bpmn/schema/1.0/core"}, + ) + if len(unit_test_elements_array) == 0: + unit_test_elements = spiff_element_maker("unitTests") + extension_elements.append(unit_test_elements) + else: + unit_test_elements = unit_test_elements_array[0] + + fuzz = "".join( + random.choice(string.ascii_uppercase + string.digits) # noqa: S311 + for _ in range(7) + ) + unit_test_id = f"unit_test_{fuzz}" + + input_json_element = spiff_element_maker("inputJson", json.dumps(input_json)) + expected_output_json_element = spiff_element_maker( + "expectedOutputJson", json.dumps(expected_output_json) + ) + unit_test_element = spiff_element_maker("unitTest", id=unit_test_id) + unit_test_element.append(input_json_element) + unit_test_element.append(expected_output_json_element) + unit_test_elements.append(unit_test_element) + SpecFileService.update_file( + process_model, file.name, etree.tostring(bpmn_etree_element) + ) + + return Response(json.dumps({"ok": True}), status=202, mimetype="application/json") + + +def script_unit_test_run( + process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]] +) -> flask.wrappers.Response: + """Script_unit_test_run.""" + # FIXME: We should probably clear this somewhere else but this works + current_app.config["THREAD_LOCAL_DATA"].process_instance_id = None + current_app.config["THREAD_LOCAL_DATA"].spiff_step = None + + python_script = _get_required_parameter_or_raise("python_script", body) + input_json = _get_required_parameter_or_raise("input_json", body) + expected_output_json = _get_required_parameter_or_raise( + "expected_output_json", body + ) + + result = ScriptUnitTestRunner.run_with_script_and_pre_post_contexts( + python_script, input_json, expected_output_json + ) + return make_response(jsonify(result), 200) + + +def get_file_from_request() -> Any: + """Get_file_from_request.""" + request_file = connexion.request.files.get("file") + if not request_file: + raise ApiError( + error_code="no_file_given", + message="Given request does not contain a file", + status_code=400, + ) + return request_file + + +def get_process_model(process_model_id: str) -> ProcessModelInfo: + """Get_process_model.""" + process_model = None + try: + process_model = ProcessModelService.get_process_model(process_model_id) + except ProcessEntityNotFoundError as exception: + raise ( + ApiError( + error_code="process_model_cannot_be_found", + message=f"Process model cannot be found: {process_model_id}", + status_code=400, + ) + ) from exception + + return process_model + + +def find_principal_or_raise() -> PrincipalModel: + """Find_principal_or_raise.""" + principal = PrincipalModel.query.filter_by(user_id=g.user.id).first() + if principal is None: + raise ( + ApiError( + error_code="principal_not_found", + message=f"Principal not found from user id: {g.user.id}", + status_code=400, + ) + ) + return principal # type: ignore + + +def find_process_instance_by_id_or_raise( + process_instance_id: int, +) -> ProcessInstanceModel: + """Find_process_instance_by_id_or_raise.""" + process_instance_query = ProcessInstanceModel.query.filter_by( + id=process_instance_id + ) + + # we had a frustrating session trying to do joins and access columns from two tables. here's some notes for our future selves: + # this returns an object that allows you to do: process_instance.UserModel.username + # process_instance = db.session.query(ProcessInstanceModel, UserModel).filter_by(id=process_instance_id).first() + # you can also use splat with add_columns, but it still didn't ultimately give us access to the process instance + # attributes or username like we wanted: + # process_instance_query.join(UserModel).add_columns(*ProcessInstanceModel.__table__.columns, UserModel.username) + + process_instance = process_instance_query.first() + if process_instance is None: + raise ( + ApiError( + error_code="process_instance_cannot_be_found", + message=f"Process instance cannot be found: {process_instance_id}", + status_code=400, + ) + ) + return process_instance # type: ignore + + +def get_value_from_array_with_index(array: list, index: int) -> Any: + """Get_value_from_array_with_index.""" + if index < 0: + return None + + if index >= len(array): + return None + + return array[index] + + +def prepare_form_data( + form_file: str, task_data: Union[dict, None], process_model: ProcessModelInfo +) -> str: + """Prepare_form_data.""" + if task_data is None: + return "" + + file_contents = SpecFileService.get_data(process_model, form_file).decode("utf-8") + return render_jinja_template(file_contents, task_data) + + +def render_jinja_template(unprocessed_template: str, data: dict[str, Any]) -> str: + """Render_jinja_template.""" + jinja_environment = jinja2.Environment( + autoescape=True, lstrip_blocks=True, trim_blocks=True + ) + template = jinja_environment.from_string(unprocessed_template) + return template.render(**data) + + +def get_spiff_task_from_process_instance( + task_id: str, + process_instance: ProcessInstanceModel, + processor: Union[ProcessInstanceProcessor, None] = None, +) -> SpiffTask: + """Get_spiff_task_from_process_instance.""" + if processor is None: + processor = ProcessInstanceProcessor(process_instance) + task_uuid = uuid.UUID(task_id) + spiff_task = processor.bpmn_process_instance.get_task(task_uuid) + + if spiff_task is None: + raise ( + ApiError( + error_code="empty_task", + message="Processor failed to obtain task.", + status_code=500, + ) + ) + return spiff_task + + +# +# Methods for secrets CRUD - maybe move somewhere else: +# +def get_secret(key: str) -> Optional[str]: + """Get_secret.""" + return SecretService.get_secret(key) + + +def secret_list( + page: int = 1, + per_page: int = 100, +) -> Response: + """Secret_list.""" + secrets = ( + SecretModel.query.order_by(SecretModel.key) + .join(UserModel) + .add_columns( + UserModel.username, + ) + .paginate(page=page, per_page=per_page, error_out=False) + ) + response_json = { + "results": secrets.items, + "pagination": { + "count": len(secrets.items), + "total": secrets.total, + "pages": secrets.pages, + }, + } + return make_response(jsonify(response_json), 200) + + +def add_secret(body: Dict) -> Response: + """Add secret.""" + secret_model = SecretService().add_secret(body["key"], body["value"], g.user.id) + assert secret_model # noqa: S101 + return Response( + json.dumps(SecretModelSchema().dump(secret_model)), + status=201, + mimetype="application/json", + ) + + +def update_secret(key: str, body: dict) -> Response: + """Update secret.""" + SecretService().update_secret(key, body["value"], g.user.id) + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def delete_secret(key: str) -> Response: + """Delete secret.""" + current_user = UserService.current_user() + SecretService.delete_secret(key, current_user.id) + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") + + +def _get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -> Any: + """Get_required_parameter_or_raise.""" + return_value = None + if parameter in post_body: + return_value = post_body[parameter] + + if return_value is None or return_value == "": + raise ( + ApiError( + error_code="missing_required_parameter", + message=f"Parameter is missing from json request body: {parameter}", + status_code=400, + ) + ) + + return return_value + + +# originally from: https://bitcoden.com/answers/python-nested-dictionary-update-value-where-any-nested-key-matches +def _update_form_schema_with_task_data_as_needed( + in_dict: dict, task_data: dict +) -> None: + """Update_nested.""" + for k, value in in_dict.items(): + if "anyOf" == k: + # value will look like the array on the right of "anyOf": ["options_from_task_data_var:awesome_options"] + if isinstance(value, list): + if len(value) == 1: + first_element_in_value_list = value[0] + if isinstance(first_element_in_value_list, str): + if first_element_in_value_list.startswith( + "options_from_task_data_var:" + ): + task_data_var = first_element_in_value_list.replace( + "options_from_task_data_var:", "" + ) + + if task_data_var not in task_data: + raise ( + ApiError( + error_code="missing_task_data_var", + message=f"Task data is missing variable: {task_data_var}", + status_code=500, + ) + ) + + select_options_from_task_data = task_data.get(task_data_var) + if isinstance(select_options_from_task_data, list): + if all( + "value" in d and "label" in d + for d in select_options_from_task_data + ): + + def map_function( + task_data_select_option: TaskDataSelectOption, + ) -> ReactJsonSchemaSelectOption: + """Map_function.""" + return { + "type": "string", + "enum": [task_data_select_option["value"]], + "title": task_data_select_option["label"], + } + + options_for_react_json_schema_form = list( + map(map_function, select_options_from_task_data) + ) + + in_dict[k] = options_for_react_json_schema_form + elif isinstance(value, dict): + _update_form_schema_with_task_data_as_needed(value, task_data) + elif isinstance(value, list): + for o in value: + if isinstance(o, dict): + _update_form_schema_with_task_data_as_needed(o, task_data) + + +def update_task_data(process_instance_id: str, task_id: str, body: Dict) -> Response: + """Update task data.""" + process_instance = ProcessInstanceModel.query.filter( + ProcessInstanceModel.id == int(process_instance_id) + ).first() + if process_instance: + process_instance_bpmn_json_dict = json.loads(process_instance.bpmn_json) + if "new_task_data" in body: + new_task_data_str: str = body["new_task_data"] + new_task_data_dict = json.loads(new_task_data_str) + if task_id in process_instance_bpmn_json_dict["tasks"]: + process_instance_bpmn_json_dict["tasks"][task_id][ + "data" + ] = new_task_data_dict + process_instance.bpmn_json = json.dumps(process_instance_bpmn_json_dict) + db.session.add(process_instance) + try: + db.session.commit() + except Exception as e: + db.session.rollback() + raise ApiError( + error_code="update_task_data_error", + message=f"Could not update the Instance. Original error is {e}", + ) from e + else: + raise ApiError( + error_code="update_task_data_error", + message=f"Could not find Task: {task_id} in Instance: {process_instance_id}.", + ) + else: + raise ApiError( + error_code="update_task_data_error", + message=f"Could not update task data for Instance: {process_instance_id}, and Task: {task_id}.", + ) + return Response( + json.dumps(ProcessInstanceModelSchema().dump(process_instance)), + status=200, + mimetype="application/json", + ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py index 739e689d..9e4c54be 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -12,6 +12,7 @@ from typing import Union import connexion # type: ignore import flask.wrappers import jinja2 +from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel import werkzeug from flask import Blueprint from flask import current_app @@ -27,10 +28,10 @@ from lxml import etree # type: ignore from lxml.builder import ElementMaker # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState -from sqlalchemy import and_ +from sqlalchemy import and_, func from sqlalchemy import asc from sqlalchemy import desc -from sqlalchemy.orm import joinedload +from sqlalchemy.orm import aliased, joinedload from spiffworkflow_backend.exceptions.process_entity_not_found_error import ( ProcessEntityNotFoundError, @@ -928,6 +929,26 @@ def process_instance_list( UserGroupAssignmentModel.user_id == g.user.id ) + # userSkillF = aliased(UserSkill) + # userSkillI = aliased(UserSkill) + + # import pdb; pdb.set_trace() + stock_columns = ProcessInstanceReportService.get_column_names_for_model(ProcessInstanceModel) + # print(f"stock_columns: {stock_columns}") + # import pdb; pdb.set_trace() + # for column in process_instance_report.report_metadata['columns']: + # if column not in stock_columns: + # # continue + for column in [{'accessor': 'key1'}]: + # print(f"column: {column['accessor']}") + # process_instance_query = process_instance_query.outerjoin(ProcessInstanceMetadataModel, ProcessInstanceModel.id == ProcessInstanceMetadataModel.process_instance_id, ProcessInstanceMetadataModel.key == column['accessor']) + instance_metadata_alias = aliased(ProcessInstanceMetadataModel) + process_instance_query = ( + process_instance_query.options(joinedload(instance_metadata_alias, ProcessInstanceModel.id == instance_metadata_alias.process_instance_id, innerjoin=False)).filter(instance_metadata_alias.key == column['accessor']) + .add_column(func.max(instance_metadata_alias.value).label(column['accessor'])) + ) + # import pdb; pdb.set_trace() + process_instances = ( process_instance_query.group_by(ProcessInstanceModel.id) .order_by( @@ -935,14 +956,26 @@ def process_instance_list( ) .paginate(page=page, per_page=per_page, error_out=False) ) + import pdb; pdb.set_trace() - results = list( - map( - ProcessInstanceService.serialize_flat_with_task_data, - process_instances.items, - ) - ) + # def awesome_serialize(process_instance) + # dict_thing = process_instance.serialize + # + # # add columns since we have access to columns here + # dict_thing['awesome'] = 'awesome' + # + # return dict_thing + + # results = list( + # map( + # ProcessInstanceService.serialize_flat_with_task_data, + # process_instances.items, + # ) + # ) + results = ProcessInstanceReportService.add_metadata_columns_to_process_instance(process_instances.items, process_instance_report.report_metadata['columns']) report_metadata = process_instance_report.report_metadata + print(f"results: {results}") + import pdb; pdb.set_trace() response_json = { "report_identifier": process_instance_report.identifier, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/authentication_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/authentication_service.py index 18f08d0f..3868adf6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/authentication_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/authentication_service.py @@ -235,8 +235,9 @@ class AuthenticationService: refresh_token_object: RefreshTokenModel = RefreshTokenModel.query.filter( RefreshTokenModel.user_id == user_id ).first() - assert refresh_token_object # noqa: S101 - return refresh_token_object.token + if refresh_token_object: + return refresh_token_object.token + return None @classmethod def get_auth_token_from_refresh_token(cls, refresh_token: str) -> dict: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_report_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_report_service.py index fc5a93da..6c579826 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_report_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_report_service.py @@ -1,6 +1,8 @@ """Process_instance_report_service.""" from dataclasses import dataclass +from flask_bpmn.models.db import db from typing import Optional +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance_report import ( ProcessInstanceReportModel, @@ -241,3 +243,20 @@ class ProcessInstanceReportService: ) return report_filter + + @classmethod + def add_metadata_columns_to_process_instance(cls, process_instance_sqlalchemy_rows, metadata_columns: list[dict]) -> list[dict]: + stock_columns = cls.get_column_names_for_model(ProcessInstanceModel) + results = [] + for process_instance in process_instance_sqlalchemy_rows: + process_instance_dict = process_instance['ProcessInstanceModel'].serialized + for metadata_column in metadata_columns: + if metadata_column['accessor'] not in stock_columns: + process_instance_dict[metadata_column['accessor']] = process_instance[metadata_column['accessor']] + + results.append(process_instance_dict) + return results + + @classmethod + def get_column_names_for_model(cls, model: db.Model) -> list[str]: + return [i.name for i in model.__table__.columns] diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 4c60cb8c..b7fc0479 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2588,12 +2588,17 @@ class TestProcessApi(BaseTest): response = client.get( f"/v1.0/process-instances?report_identifier={process_instance_report.identifier}", + # f"/v1.0/process-instances?report_identifier=demo1", headers=self.logged_in_headers(with_super_admin_user), ) print(f"response.json: {response.json}") - assert response.status_code == 200 assert response.json is not None + assert response.status_code == 200 + assert len(response.json["results"]) == 1 + assert response.json["results"][0]["status"] == "complete" + assert response.json["results"][0]["id"] == process_instance.id + # assert response.json["results"][0]["key1"] == "value1" assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["pages"] == 1 assert response.json["pagination"]["total"] == 1