Look for bad get_current_user scripts (#1370)
* initial script to find get_current_user calls * handle pre and post scripts as well * refactor * omit absolute root dir path when printing * avoid printing NS * consolidate lxml parsing in one place --------- Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
parent
2bec24aa63
commit
f2c982a96c
2
Makefile
2
Makefile
|
@ -112,7 +112,7 @@ pre-commit:
|
|||
$(IN_ARENA) poetry run pre-commit run --verbose --all-files
|
||||
|
||||
ruff:
|
||||
$(IN_ARENA) poetry run ruff --fix spiffworkflow-backend
|
||||
$(IN_ARENA) poetry run ruff check --fix spiffworkflow-backend
|
||||
|
||||
run-pyl: fe-lint-fix ruff pre-commit be-mypy be-tests-par
|
||||
@true
|
||||
|
|
|
@ -24,7 +24,7 @@ function get_python_dirs() {
|
|||
function run_autofixers() {
|
||||
python_dirs="$(get_python_dirs) bin"
|
||||
# shellcheck disable=2086
|
||||
poetry run ruff --fix $python_dirs || echo ''
|
||||
poetry run ruff check --fix $python_dirs || echo ''
|
||||
}
|
||||
|
||||
function run_pre_commmit() {
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
import glob
|
||||
import os
|
||||
from typing import NoReturn
|
||||
|
||||
from lxml import etree
|
||||
from spiffworkflow_backend import create_app
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
|
||||
|
||||
def find_script_tasks_with_get_current_user(bpmn_file_path: str, root_path: str) -> None:
|
||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
|
||||
try:
|
||||
with open(bpmn_file_path, "rb") as bpmn_file:
|
||||
binary_data = bpmn_file.read()
|
||||
tree = SpecFileService.get_etree_from_xml_bytes(binary_data)
|
||||
check_script_and_prescript_elements(tree, bpmn_file_path, root_path)
|
||||
except etree.XMLSyntaxError:
|
||||
print(f"Error parsing XML in file {bpmn_file_path}. Please check for syntax issues.")
|
||||
return
|
||||
|
||||
|
||||
def check_script_and_prescript_elements(tree, bpmn_file_path: str, root_path: str) -> None:
|
||||
# Define the namespace map to search for elements
|
||||
nsmap = {
|
||||
"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL",
|
||||
"spiffworkflow": "http://spiffworkflow.org/bpmn/schema/1.0/core",
|
||||
}
|
||||
# Find all script tasks and preScript elements
|
||||
script_tasks = tree.xpath("//bpmn:scriptTask", namespaces=nsmap)
|
||||
pre_scripts = tree.xpath("//spiffworkflow:preScript", namespaces=nsmap)
|
||||
|
||||
# Check script tasks for get_current_user() calls
|
||||
for task in script_tasks:
|
||||
script = task.find("bpmn:script", namespaces=nsmap)
|
||||
if script is not None and script.text is not None and "get_current_user()" in script.text:
|
||||
print(f'Found get_current_user() in script task {task.get("id")} of file {bpmn_file_path}')
|
||||
|
||||
# Check preScript elements for get_current_user() calls
|
||||
check_scripts_for_get_current_user(pre_scripts, bpmn_file_path, "preScript", root_path)
|
||||
post_scripts = tree.xpath("//spiffworkflow:postScript", namespaces=nsmap)
|
||||
check_scripts_for_get_current_user(post_scripts, bpmn_file_path, "postScript", root_path)
|
||||
|
||||
|
||||
def check_scripts_for_get_current_user(scripts, bpmn_file_path: str, script_type: str, root_path: str) -> None:
|
||||
for script in scripts:
|
||||
if script is not None and script.text is not None and "get_current_user()" in script.text:
|
||||
# Get the parent of the parent to find the actual BPMN element
|
||||
parent = script.getparent().getparent()
|
||||
relative_path = os.path.relpath(bpmn_file_path, root_path)
|
||||
tag_without_namespace = etree.QName(parent.tag).localname
|
||||
if tag_without_namespace not in ["manualTask", "userTask"]:
|
||||
print(
|
||||
f"Found get_current_user() in {script_type} of {tag_without_namespace} "
|
||||
f'with id {parent.get("id")} in file {relative_path}'
|
||||
)
|
||||
|
||||
|
||||
def main() -> NoReturn:
|
||||
app = create_app()
|
||||
with app.app_context():
|
||||
hot_dir = FileSystemService.root_path()
|
||||
# Search for BPMN files and check for get_current_user() calls in script tasks
|
||||
bpmn_files = glob.glob(os.path.expanduser(f"{hot_dir}/**/*.bpmn"), recursive=True)
|
||||
for bpmn_file in bpmn_files:
|
||||
find_script_tasks_with_get_current_user(bpmn_file, hot_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -29,6 +29,7 @@ from spiffworkflow_backend.scripts.script import Script
|
|||
from spiffworkflow_backend.services.custom_parser import MyCustomParser
|
||||
from spiffworkflow_backend.services.jinja_service import JinjaHelpers
|
||||
from spiffworkflow_backend.services.process_instance_processor import CustomScriptEngineEnvironment
|
||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
|
||||
DEFAULT_NSMAP = {
|
||||
"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL",
|
||||
|
@ -283,11 +284,9 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega
|
|||
parser.add_dmn_files_by_glob(dmn_file_glob)
|
||||
|
||||
def _get_etree_from_bpmn_file(self, bpmn_file: str) -> etree._Element:
|
||||
data = None
|
||||
with open(bpmn_file, "rb") as f_handle:
|
||||
data = f_handle.read()
|
||||
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
||||
return etree.fromstring(data, parser=etree_xml_parser) # noqa: S320
|
||||
return SpecFileService.get_etree_from_xml_bytes(data)
|
||||
|
||||
def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]:
|
||||
related_bpmn_files = []
|
||||
|
|
|
@ -56,15 +56,17 @@ class SpecFileService(FileSystemService):
|
|||
|
||||
@classmethod
|
||||
def get_references_for_file(cls, file: File, process_model_info: ProcessModelInfo) -> list[Reference]:
|
||||
full_file_path = SpecFileService.full_file_path(process_model_info, file.name)
|
||||
file_contents: bytes = b""
|
||||
with open(full_file_path) as f:
|
||||
file_contents = f.read().encode()
|
||||
full_file_path = cls.full_file_path(process_model_info, file.name)
|
||||
with open(full_file_path, "rb") as f:
|
||||
file_contents = f.read()
|
||||
return cls.get_references_for_file_contents(process_model_info, file.name, file_contents)
|
||||
|
||||
# This is designed to isolate xml parsing, which is a security issue, and make it as safe as possible.
|
||||
# S320 indicates that xml parsing with lxml is unsafe. To mitigate this, we add options to the parser
|
||||
# to make it as safe as we can. No exploits have been demonstrated with this parser, but we will try to stay alert.
|
||||
@classmethod
|
||||
def get_etree_from_xml_bytes(cls, binary_data: bytes) -> etree.Element:
|
||||
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
||||
etree_xml_parser = etree.XMLParser(resolve_entities=False, remove_comments=True, no_network=True)
|
||||
return etree.fromstring(binary_data, parser=etree_xml_parser) # noqa: S320
|
||||
|
||||
@classmethod
|
||||
|
|
Loading…
Reference in New Issue