enable flake8-bandit, cut off all http requests at 15 seconds to avoid hanging process, ignore xml lib spiff uses

This commit is contained in:
burnettk 2023-05-30 19:53:26 -04:00
parent 408b60187b
commit 9d5d9086c6
No known key found for this signature in database
9 changed files with 22 additions and 14 deletions

View File

@ -26,7 +26,9 @@ from spiffworkflow_backend import create_app # noqa: E402
@pytest.fixture(scope="session")
def app() -> Flask: # noqa
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "unit_testing"
os.environ["FLASK_SESSION_SECRET_KEY"] = "e7711a3ba96c46c68e084a86952de16f"
os.environ["FLASK_SESSION_SECRET_KEY"] = (
"e7711a3ba96c46c68e084a86952de16f" # noqa: S105, do not care about security when running unit tests
)
app = create_app()
return app

View File

@ -173,7 +173,7 @@ select = [
"F", # pyflakes
"N", # pep8-naming
"PL", # pylint
# "S", # flake8-bandit
"S", # flake8-bandit
"UP", # pyupgrade
"W", # pycodestyle warning
"I001" # isort
@ -192,7 +192,7 @@ target-version = "py310"
[tool.ruff.per-file-ignores]
"migrations/versions/*.py" = ["E501"]
"tests/**/*.py" = ["PLR2004"]
"tests/**/*.py" = ["PLR2004", "S101"] # PLR2004 is about magic vars, S101 allows assert
[tool.ruff.isort]
force-single-line = true

View File

@ -9,6 +9,8 @@ from werkzeug.utils import ImportStringError
from spiffworkflow_backend.services.logging_service import setup_logger
HTTP_REQUEST_TIMEOUT_SECONDS = 15
class ConfigurationError(Exception):
pass

View File

@ -3,7 +3,7 @@ import os
from os import environ
TESTING = True
SECRET_KEY = "the_secret_key"
SECRET_KEY = "the_secret_key" # noqa: S105, do not care about security when running unit tests
SPIFFWORKFLOW_BACKEND_LOG_TO_FILE = environ.get("SPIFFWORKFLOW_BACKEND_LOG_TO_FILE", default="true") == "true"
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(

View File

@ -5,6 +5,8 @@ import requests
from flask import current_app
from flask.wrappers import Response
from spiffworkflow_backend.config import HTTP_REQUEST_TIMEOUT_SECONDS
def connector_proxy_typeahead_url() -> Any:
"""Returns the connector proxy type ahead url."""
@ -14,7 +16,7 @@ def connector_proxy_typeahead_url() -> Any:
def typeahead(category: str, prefix: str, limit: int) -> flask.wrappers.Response:
url = f"{connector_proxy_typeahead_url()}/v1/typeahead/{category}?prefix={prefix}&limit={limit}"
proxy_response = requests.get(url)
proxy_response = requests.get(url, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
status = proxy_response.status_code
response = proxy_response.text

View File

@ -7,6 +7,7 @@ import jwt
import requests
from flask import current_app
from flask import redirect
from spiffworkflow_backend.config import HTTP_REQUEST_TIMEOUT_SECONDS
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.refresh_token import RefreshTokenModel
from werkzeug.wrappers import Response
@ -78,7 +79,7 @@ class AuthenticationService:
openid_config_url = f"{cls.server_url()}/.well-known/openid-configuration"
if name not in AuthenticationService.ENDPOINT_CACHE:
try:
response = requests.get(openid_config_url)
response = requests.get(openid_config_url, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
AuthenticationService.ENDPOINT_CACHE = response.json()
except requests.exceptions.ConnectionError as ce:
raise OpenIdConnectionError(f"Cannot connect to given open id url: {openid_config_url}") from ce
@ -139,7 +140,7 @@ class AuthenticationService:
request_url = self.open_id_endpoint_for_name("token_endpoint")
response = requests.post(request_url, data=data, headers=headers)
response = requests.post(request_url, data=data, headers=headers, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
auth_token_object: dict = json.loads(response.text)
return auth_token_object
@ -244,6 +245,6 @@ class AuthenticationService:
request_url = cls.open_id_endpoint_for_name("token_endpoint")
response = requests.post(request_url, data=data, headers=headers)
response = requests.post(request_url, data=data, headers=headers, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
auth_token_object: dict = json.loads(response.text)
return auth_token_object

View File

@ -132,7 +132,7 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega
with open(bpmn_file, "rb") as f_handle:
data = f_handle.read()
etree_xml_parser = etree.XMLParser(resolve_entities=False)
return etree.fromstring(data, parser=etree_xml_parser)
return etree.fromstring(data, parser=etree_xml_parser) # noqa: S320
def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]:
related_bpmn_files = []
@ -160,7 +160,7 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega
# if we cannot load process model then ignore it since it can cause errors unrelated
# to the test and if it is related, it will most likely be caught further along the test
try:
root = etree.fromstring(file_contents, parser=etree_xml_parser)
root = etree.fromstring(file_contents, parser=etree_xml_parser) # noqa: S320
except etree.XMLSyntaxError:
continue

View File

@ -6,6 +6,7 @@ import requests
import sentry_sdk
from flask import current_app
from flask import g
from spiffworkflow_backend.config import HTTP_REQUEST_TIMEOUT_SECONDS
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.secret_service import SecretService
from spiffworkflow_backend.services.user_service import UserService
@ -78,7 +79,7 @@ class ServiceTaskDelegate:
params = {k: ServiceTaskDelegate.check_prefixes(v["value"]) for k, v in bpmn_params.items()}
params["spiff__task_data"] = task_data
proxied_response = requests.post(call_url, json=params)
proxied_response = requests.post(call_url, json=params, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
response_text = proxied_response.text
json_parse_error = None
@ -128,7 +129,7 @@ class ServiceTaskService:
def available_connectors() -> Any:
"""Returns a list of available connectors."""
try:
response = requests.get(f"{connector_proxy_url()}/v1/commands")
response = requests.get(f"{connector_proxy_url()}/v1/commands", timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
if response.status_code != 200:
return []
@ -143,7 +144,7 @@ class ServiceTaskService:
def authentication_list() -> Any:
"""Returns a list of available authentications."""
try:
response = requests.get(f"{connector_proxy_url()}/v1/auths")
response = requests.get(f"{connector_proxy_url()}/v1/auths", timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
if response.status_code != 200:
return []

View File

@ -79,7 +79,7 @@ class SpecFileService(FileSystemService):
def get_etree_from_xml_bytes(cls, binary_data: bytes) -> etree.Element:
"""Get_etree_from_xml_bytes."""
etree_xml_parser = etree.XMLParser(resolve_entities=False)
return etree.fromstring(binary_data, parser=etree_xml_parser)
return etree.fromstring(binary_data, parser=etree_xml_parser) # noqa: S320
@classmethod
def get_references_for_file_contents(