mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-01-27 01:40:48 +00:00
enable flake8-bandit, cut off all http requests at 15 seconds to avoid hanging process, ignore xml lib spiff uses
This commit is contained in:
parent
aae043e0a4
commit
397d99b5c0
@ -26,7 +26,9 @@ from spiffworkflow_backend import create_app # noqa: E402
|
||||
@pytest.fixture(scope="session")
|
||||
def app() -> Flask: # noqa
|
||||
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "unit_testing"
|
||||
os.environ["FLASK_SESSION_SECRET_KEY"] = "e7711a3ba96c46c68e084a86952de16f"
|
||||
os.environ["FLASK_SESSION_SECRET_KEY"] = (
|
||||
"e7711a3ba96c46c68e084a86952de16f" # noqa: S105, do not care about security when running unit tests
|
||||
)
|
||||
app = create_app()
|
||||
|
||||
return app
|
||||
|
@ -173,7 +173,7 @@ select = [
|
||||
"F", # pyflakes
|
||||
"N", # pep8-naming
|
||||
"PL", # pylint
|
||||
# "S", # flake8-bandit
|
||||
"S", # flake8-bandit
|
||||
"UP", # pyupgrade
|
||||
"W", # pycodestyle warning
|
||||
"I001" # isort
|
||||
@ -192,7 +192,7 @@ target-version = "py310"
|
||||
|
||||
[tool.ruff.per-file-ignores]
|
||||
"migrations/versions/*.py" = ["E501"]
|
||||
"tests/**/*.py" = ["PLR2004"]
|
||||
"tests/**/*.py" = ["PLR2004", "S101"] # PLR2004 is about magic vars, S101 allows assert
|
||||
|
||||
[tool.ruff.isort]
|
||||
force-single-line = true
|
||||
|
@ -9,6 +9,8 @@ from werkzeug.utils import ImportStringError
|
||||
|
||||
from spiffworkflow_backend.services.logging_service import setup_logger
|
||||
|
||||
HTTP_REQUEST_TIMEOUT_SECONDS = 15
|
||||
|
||||
|
||||
class ConfigurationError(Exception):
|
||||
pass
|
||||
|
@ -3,7 +3,7 @@ import os
|
||||
from os import environ
|
||||
|
||||
TESTING = True
|
||||
SECRET_KEY = "the_secret_key"
|
||||
SECRET_KEY = "the_secret_key" # noqa: S105, do not care about security when running unit tests
|
||||
SPIFFWORKFLOW_BACKEND_LOG_TO_FILE = environ.get("SPIFFWORKFLOW_BACKEND_LOG_TO_FILE", default="true") == "true"
|
||||
|
||||
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(
|
||||
|
@ -5,6 +5,8 @@ import requests
|
||||
from flask import current_app
|
||||
from flask.wrappers import Response
|
||||
|
||||
from spiffworkflow_backend.config import HTTP_REQUEST_TIMEOUT_SECONDS
|
||||
|
||||
|
||||
def connector_proxy_typeahead_url() -> Any:
|
||||
"""Returns the connector proxy type ahead url."""
|
||||
@ -14,7 +16,7 @@ def connector_proxy_typeahead_url() -> Any:
|
||||
def typeahead(category: str, prefix: str, limit: int) -> flask.wrappers.Response:
|
||||
url = f"{connector_proxy_typeahead_url()}/v1/typeahead/{category}?prefix={prefix}&limit={limit}"
|
||||
|
||||
proxy_response = requests.get(url)
|
||||
proxy_response = requests.get(url, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
status = proxy_response.status_code
|
||||
response = proxy_response.text
|
||||
|
||||
|
@ -7,6 +7,7 @@ import jwt
|
||||
import requests
|
||||
from flask import current_app
|
||||
from flask import redirect
|
||||
from spiffworkflow_backend.config import HTTP_REQUEST_TIMEOUT_SECONDS
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.refresh_token import RefreshTokenModel
|
||||
from werkzeug.wrappers import Response
|
||||
@ -78,7 +79,7 @@ class AuthenticationService:
|
||||
openid_config_url = f"{cls.server_url()}/.well-known/openid-configuration"
|
||||
if name not in AuthenticationService.ENDPOINT_CACHE:
|
||||
try:
|
||||
response = requests.get(openid_config_url)
|
||||
response = requests.get(openid_config_url, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
AuthenticationService.ENDPOINT_CACHE = response.json()
|
||||
except requests.exceptions.ConnectionError as ce:
|
||||
raise OpenIdConnectionError(f"Cannot connect to given open id url: {openid_config_url}") from ce
|
||||
@ -139,7 +140,7 @@ class AuthenticationService:
|
||||
|
||||
request_url = self.open_id_endpoint_for_name("token_endpoint")
|
||||
|
||||
response = requests.post(request_url, data=data, headers=headers)
|
||||
response = requests.post(request_url, data=data, headers=headers, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
auth_token_object: dict = json.loads(response.text)
|
||||
return auth_token_object
|
||||
|
||||
@ -244,6 +245,6 @@ class AuthenticationService:
|
||||
|
||||
request_url = cls.open_id_endpoint_for_name("token_endpoint")
|
||||
|
||||
response = requests.post(request_url, data=data, headers=headers)
|
||||
response = requests.post(request_url, data=data, headers=headers, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
auth_token_object: dict = json.loads(response.text)
|
||||
return auth_token_object
|
||||
|
@ -132,7 +132,7 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega
|
||||
with open(bpmn_file, "rb") as f_handle:
|
||||
data = f_handle.read()
|
||||
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
||||
return etree.fromstring(data, parser=etree_xml_parser)
|
||||
return etree.fromstring(data, parser=etree_xml_parser) # noqa: S320
|
||||
|
||||
def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]:
|
||||
related_bpmn_files = []
|
||||
@ -160,7 +160,7 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega
|
||||
# if we cannot load process model then ignore it since it can cause errors unrelated
|
||||
# to the test and if it is related, it will most likely be caught further along the test
|
||||
try:
|
||||
root = etree.fromstring(file_contents, parser=etree_xml_parser)
|
||||
root = etree.fromstring(file_contents, parser=etree_xml_parser) # noqa: S320
|
||||
except etree.XMLSyntaxError:
|
||||
continue
|
||||
|
||||
|
@ -6,6 +6,7 @@ import requests
|
||||
import sentry_sdk
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
from spiffworkflow_backend.config import HTTP_REQUEST_TIMEOUT_SECONDS
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.secret_service import SecretService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
@ -78,7 +79,7 @@ class ServiceTaskDelegate:
|
||||
params = {k: ServiceTaskDelegate.check_prefixes(v["value"]) for k, v in bpmn_params.items()}
|
||||
params["spiff__task_data"] = task_data
|
||||
|
||||
proxied_response = requests.post(call_url, json=params)
|
||||
proxied_response = requests.post(call_url, json=params, timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
response_text = proxied_response.text
|
||||
json_parse_error = None
|
||||
|
||||
@ -128,7 +129,7 @@ class ServiceTaskService:
|
||||
def available_connectors() -> Any:
|
||||
"""Returns a list of available connectors."""
|
||||
try:
|
||||
response = requests.get(f"{connector_proxy_url()}/v1/commands")
|
||||
response = requests.get(f"{connector_proxy_url()}/v1/commands", timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
|
||||
if response.status_code != 200:
|
||||
return []
|
||||
@ -143,7 +144,7 @@ class ServiceTaskService:
|
||||
def authentication_list() -> Any:
|
||||
"""Returns a list of available authentications."""
|
||||
try:
|
||||
response = requests.get(f"{connector_proxy_url()}/v1/auths")
|
||||
response = requests.get(f"{connector_proxy_url()}/v1/auths", timeout=HTTP_REQUEST_TIMEOUT_SECONDS)
|
||||
|
||||
if response.status_code != 200:
|
||||
return []
|
||||
|
@ -79,7 +79,7 @@ class SpecFileService(FileSystemService):
|
||||
def get_etree_from_xml_bytes(cls, binary_data: bytes) -> etree.Element:
|
||||
"""Get_etree_from_xml_bytes."""
|
||||
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
||||
return etree.fromstring(binary_data, parser=etree_xml_parser)
|
||||
return etree.fromstring(binary_data, parser=etree_xml_parser) # noqa: S320
|
||||
|
||||
@classmethod
|
||||
def get_references_for_file_contents(
|
||||
|
Loading…
x
Reference in New Issue
Block a user