From f2c982a96cb9d427d842dfa9a89773d89ef58ae3 Mon Sep 17 00:00:00 2001 From: Kevin Burnett <18027+burnettk@users.noreply.github.com> Date: Wed, 10 Apr 2024 16:44:13 +0000 Subject: [PATCH] Look for bad get_current_user scripts (#1370) * initial script to find get_current_user calls * handle pre and post scripts as well * refactor * omit absolute root dir path when printing * avoid printing NS * consolidate lxml parsing in one place --------- Co-authored-by: burnettk --- Makefile | 2 +- bin/run_pyl | 2 +- .../bin/lint_get_current_user_scripts.py | 70 +++++++++++++++++++ .../process_model_test_runner_service.py | 5 +- .../services/spec_file_service.py | 12 ++-- 5 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 spiffworkflow-backend/bin/lint_get_current_user_scripts.py diff --git a/Makefile b/Makefile index 85f89e61..8dbc5956 100644 --- a/Makefile +++ b/Makefile @@ -112,7 +112,7 @@ pre-commit: $(IN_ARENA) poetry run pre-commit run --verbose --all-files ruff: - $(IN_ARENA) poetry run ruff --fix spiffworkflow-backend + $(IN_ARENA) poetry run ruff check --fix spiffworkflow-backend run-pyl: fe-lint-fix ruff pre-commit be-mypy be-tests-par @true diff --git a/bin/run_pyl b/bin/run_pyl index 9f191c30..ff2e570a 100755 --- a/bin/run_pyl +++ b/bin/run_pyl @@ -24,7 +24,7 @@ function get_python_dirs() { function run_autofixers() { python_dirs="$(get_python_dirs) bin" # shellcheck disable=2086 - poetry run ruff --fix $python_dirs || echo '' + poetry run ruff check --fix $python_dirs || echo '' } function run_pre_commmit() { diff --git a/spiffworkflow-backend/bin/lint_get_current_user_scripts.py b/spiffworkflow-backend/bin/lint_get_current_user_scripts.py new file mode 100644 index 00000000..0740ded2 --- /dev/null +++ b/spiffworkflow-backend/bin/lint_get_current_user_scripts.py @@ -0,0 +1,70 @@ +import glob +import os +from typing import NoReturn + +from lxml import etree +from spiffworkflow_backend import create_app +from spiffworkflow_backend.services.file_system_service import FileSystemService + + +def find_script_tasks_with_get_current_user(bpmn_file_path: str, root_path: str) -> None: + from spiffworkflow_backend.services.spec_file_service import SpecFileService + + try: + with open(bpmn_file_path, "rb") as bpmn_file: + binary_data = bpmn_file.read() + tree = SpecFileService.get_etree_from_xml_bytes(binary_data) + check_script_and_prescript_elements(tree, bpmn_file_path, root_path) + except etree.XMLSyntaxError: + print(f"Error parsing XML in file {bpmn_file_path}. Please check for syntax issues.") + return + + +def check_script_and_prescript_elements(tree, bpmn_file_path: str, root_path: str) -> None: + # Define the namespace map to search for elements + nsmap = { + "bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL", + "spiffworkflow": "http://spiffworkflow.org/bpmn/schema/1.0/core", + } + # Find all script tasks and preScript elements + script_tasks = tree.xpath("//bpmn:scriptTask", namespaces=nsmap) + pre_scripts = tree.xpath("//spiffworkflow:preScript", namespaces=nsmap) + + # Check script tasks for get_current_user() calls + for task in script_tasks: + script = task.find("bpmn:script", namespaces=nsmap) + if script is not None and script.text is not None and "get_current_user()" in script.text: + print(f'Found get_current_user() in script task {task.get("id")} of file {bpmn_file_path}') + + # Check preScript elements for get_current_user() calls + check_scripts_for_get_current_user(pre_scripts, bpmn_file_path, "preScript", root_path) + post_scripts = tree.xpath("//spiffworkflow:postScript", namespaces=nsmap) + check_scripts_for_get_current_user(post_scripts, bpmn_file_path, "postScript", root_path) + + +def check_scripts_for_get_current_user(scripts, bpmn_file_path: str, script_type: str, root_path: str) -> None: + for script in scripts: + if script is not None and script.text is not None and "get_current_user()" in script.text: + # Get the parent of the parent to find the actual BPMN element + parent = script.getparent().getparent() + relative_path = os.path.relpath(bpmn_file_path, root_path) + tag_without_namespace = etree.QName(parent.tag).localname + if tag_without_namespace not in ["manualTask", "userTask"]: + print( + f"Found get_current_user() in {script_type} of {tag_without_namespace} " + f'with id {parent.get("id")} in file {relative_path}' + ) + + +def main() -> NoReturn: + app = create_app() + with app.app_context(): + hot_dir = FileSystemService.root_path() + # Search for BPMN files and check for get_current_user() calls in script tasks + bpmn_files = glob.glob(os.path.expanduser(f"{hot_dir}/**/*.bpmn"), recursive=True) + for bpmn_file in bpmn_files: + find_script_tasks_with_get_current_user(bpmn_file, hot_dir) + + +if __name__ == "__main__": + main() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py index 97f8ff05..cc416126 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py @@ -29,6 +29,7 @@ from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.services.custom_parser import MyCustomParser from spiffworkflow_backend.services.jinja_service import JinjaHelpers from spiffworkflow_backend.services.process_instance_processor import CustomScriptEngineEnvironment +from spiffworkflow_backend.services.spec_file_service import SpecFileService DEFAULT_NSMAP = { "bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL", @@ -283,11 +284,9 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega parser.add_dmn_files_by_glob(dmn_file_glob) def _get_etree_from_bpmn_file(self, bpmn_file: str) -> etree._Element: - data = None with open(bpmn_file, "rb") as f_handle: data = f_handle.read() - etree_xml_parser = etree.XMLParser(resolve_entities=False) - return etree.fromstring(data, parser=etree_xml_parser) # noqa: S320 + return SpecFileService.get_etree_from_xml_bytes(data) def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]: related_bpmn_files = [] diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py index 3e567bfb..8757db25 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py @@ -56,15 +56,17 @@ class SpecFileService(FileSystemService): @classmethod def get_references_for_file(cls, file: File, process_model_info: ProcessModelInfo) -> list[Reference]: - full_file_path = SpecFileService.full_file_path(process_model_info, file.name) - file_contents: bytes = b"" - with open(full_file_path) as f: - file_contents = f.read().encode() + full_file_path = cls.full_file_path(process_model_info, file.name) + with open(full_file_path, "rb") as f: + file_contents = f.read() return cls.get_references_for_file_contents(process_model_info, file.name, file_contents) + # This is designed to isolate xml parsing, which is a security issue, and make it as safe as possible. + # S320 indicates that xml parsing with lxml is unsafe. To mitigate this, we add options to the parser + # to make it as safe as we can. No exploits have been demonstrated with this parser, but we will try to stay alert. @classmethod def get_etree_from_xml_bytes(cls, binary_data: bytes) -> etree.Element: - etree_xml_parser = etree.XMLParser(resolve_entities=False) + etree_xml_parser = etree.XMLParser(resolve_entities=False, remove_comments=True, no_network=True) return etree.fromstring(binary_data, parser=etree_xml_parser) # noqa: S320 @classmethod