mypy fixes

This commit is contained in:
mike cullerton 2022-08-01 16:20:36 -04:00
parent 53449334d7
commit 5a37ea14d0
9 changed files with 542 additions and 593 deletions

149
poetry.lock generated
View File

@ -1922,6 +1922,25 @@ category = "main"
optional = false optional = false
python-versions = "*" python-versions = "*"
[[package]]
name = "types-requests"
version = "2.28.6"
description = "Typing stubs for requests"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
types-urllib3 = "<1.27"
[[package]]
name = "types-urllib3"
version = "1.26.20"
description = "Typing stubs for urllib3"
category = "main"
optional = false
python-versions = "*"
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"
version = "4.3.0" version = "4.3.0"
@ -2068,15 +2087,15 @@ Pygments = {version = "*", optional = true, markers = "python_version >= \"3.5.0
six = "*" six = "*"
[package.extras] [package.extras]
all = ["cmake", "codecov", "ninja", "pybind11", "scikit-build", "six", "colorama", "pytest", "pytest-cov", "debugpy", "pytest", "pygments", "pytest", "pytest-cov", "pytest", "pytest-cov", "jupyter-client", "ipython", "debugpy", "ipykernel", "pytest", "debugpy", "debugpy", "typing", "debugpy", "ipython-genutils", "pytest", "pygments", "attrs", "jedi", "jinja2", "jupyter-core", "nbconvert", "pytest-cov", "jupyter-client", "ipython", "ipykernel"] tests = ["pytest-cov", "pytest", "typing", "pytest", "pytest-cov", "pytest", "pytest-cov", "pytest", "pytest", "pytest-cov", "pytest", "scikit-build", "pybind11", "ninja", "codecov", "cmake"]
all-strict = ["cmake (==3.21.2)", "codecov (==2.0.15)", "ninja (==1.10.2)", "pybind11 (==2.7.1)", "scikit-build (==0.11.1)", "six (==1.11.0)", "colorama (==0.4.1)", "pytest-cov (==2.8.1)", "pytest (==4.6.0)", "debugpy (==1.3.0)", "pytest (==4.6.0)", "Pygments (==2.0.0)", "pytest-cov (==2.8.1)", "pytest (==4.6.0)", "pytest-cov (==2.9.0)", "pytest (==4.6.0)", "jupyter-client (==6.1.5)", "IPython (==7.10.0)", "debugpy (==1.0.0)", "ipykernel (==5.2.0)", "pytest (==4.6.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "typing (==3.7.4)", "debugpy (==1.6.0)", "ipython-genutils (==0.2.0)", "pytest (==6.2.5)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "pytest-cov (==3.0.0)", "jupyter-client (==7.0.0)", "IPython (==7.23.1)", "ipykernel (==6.0.0)"] tests-strict = ["pytest-cov (==3.0.0)", "pytest (==6.2.5)", "typing (==3.7.4)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest-cov (==2.9.0)", "pytest (==4.6.0)", "pytest-cov (==2.8.1)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest-cov (==2.8.1)", "scikit-build (==0.11.1)", "pybind11 (==2.7.1)", "ninja (==1.10.2)", "codecov (==2.0.15)", "cmake (==3.21.2)"]
colors = ["colorama", "pygments", "pygments"]
jupyter = ["debugpy", "jupyter-client", "ipython", "debugpy", "ipykernel", "debugpy", "debugpy", "debugpy", "ipython-genutils", "attrs", "jedi", "jinja2", "jupyter-core", "nbconvert", "jupyter-client", "ipython", "ipykernel"]
optional-strict = ["colorama (==0.4.1)", "debugpy (==1.3.0)", "tomli (==0.2.0)", "Pygments (==2.0.0)", "jupyter-client (==6.1.5)", "IPython (==7.10.0)", "debugpy (==1.0.0)", "ipykernel (==5.2.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.6.0)", "ipython-genutils (==0.2.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "jupyter-client (==7.0.0)", "IPython (==7.23.1)", "ipykernel (==6.0.0)"]
optional = ["colorama", "debugpy", "tomli", "pygments", "jupyter-client", "ipython", "debugpy", "ipykernel", "debugpy", "debugpy", "debugpy", "ipython-genutils", "pygments", "attrs", "jedi", "jinja2", "jupyter-core", "nbconvert", "jupyter-client", "ipython", "ipykernel"]
runtime-strict = ["six (==1.11.0)"] runtime-strict = ["six (==1.11.0)"]
tests = ["cmake", "codecov", "ninja", "pybind11", "scikit-build", "pytest", "pytest-cov", "pytest", "pytest", "pytest-cov", "pytest", "pytest-cov", "pytest", "typing", "pytest", "pytest-cov"] optional = ["ipykernel", "ipython", "jupyter-client", "nbconvert", "jupyter-core", "jinja2", "jedi", "attrs", "pygments", "ipython-genutils", "debugpy", "debugpy", "debugpy", "ipykernel", "debugpy", "ipython", "jupyter-client", "pygments", "tomli", "debugpy", "colorama"]
tests-strict = ["cmake (==3.21.2)", "codecov (==2.0.15)", "ninja (==1.10.2)", "pybind11 (==2.7.1)", "scikit-build (==0.11.1)", "pytest-cov (==2.8.1)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest-cov (==2.8.1)", "pytest (==4.6.0)", "pytest-cov (==2.9.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "typing (==3.7.4)", "pytest (==6.2.5)", "pytest-cov (==3.0.0)"] optional-strict = ["ipykernel (==6.0.0)", "IPython (==7.23.1)", "jupyter-client (==7.0.0)", "nbconvert (==6.0.0)", "jupyter-core (==4.7.0)", "jinja2 (==3.0.0)", "jedi (==0.16)", "attrs (==19.2.0)", "Pygments (==2.4.1)", "ipython-genutils (==0.2.0)", "debugpy (==1.6.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "ipykernel (==5.2.0)", "debugpy (==1.0.0)", "IPython (==7.10.0)", "jupyter-client (==6.1.5)", "Pygments (==2.0.0)", "tomli (==0.2.0)", "debugpy (==1.3.0)", "colorama (==0.4.1)"]
jupyter = ["ipykernel", "ipython", "jupyter-client", "nbconvert", "jupyter-core", "jinja2", "jedi", "attrs", "ipython-genutils", "debugpy", "debugpy", "debugpy", "ipykernel", "debugpy", "ipython", "jupyter-client", "debugpy"]
colors = ["pygments", "pygments", "colorama"]
all = ["ipykernel", "ipython", "jupyter-client", "pytest-cov", "nbconvert", "jupyter-core", "jinja2", "jedi", "attrs", "pygments", "pytest", "ipython-genutils", "debugpy", "typing", "debugpy", "debugpy", "pytest", "ipykernel", "debugpy", "ipython", "jupyter-client", "pytest-cov", "pytest", "pytest-cov", "pytest", "pygments", "pytest", "debugpy", "pytest-cov", "pytest", "colorama", "six", "scikit-build", "pybind11", "ninja", "codecov", "cmake"]
all-strict = ["ipykernel (==6.0.0)", "IPython (==7.23.1)", "jupyter-client (==7.0.0)", "pytest-cov (==3.0.0)", "nbconvert (==6.0.0)", "jupyter-core (==4.7.0)", "jinja2 (==3.0.0)", "jedi (==0.16)", "attrs (==19.2.0)", "Pygments (==2.4.1)", "pytest (==6.2.5)", "ipython-genutils (==0.2.0)", "debugpy (==1.6.0)", "typing (==3.7.4)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "pytest (==4.6.0)", "ipykernel (==5.2.0)", "debugpy (==1.0.0)", "IPython (==7.10.0)", "jupyter-client (==6.1.5)", "pytest (==4.6.0)", "pytest-cov (==2.9.0)", "pytest (==4.6.0)", "pytest-cov (==2.8.1)", "Pygments (==2.0.0)", "pytest (==4.6.0)", "debugpy (==1.3.0)", "pytest (==4.6.0)", "pytest-cov (==2.8.1)", "colorama (==0.4.1)", "six (==1.11.0)", "scikit-build (==0.11.1)", "pybind11 (==2.7.1)", "ninja (==1.10.2)", "codecov (==2.0.15)", "cmake (==3.21.2)"]
[[package]] [[package]]
name = "zipp" name = "zipp"
@ -2093,7 +2112,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.9" python-versions = "^3.9"
content-hash = "66503576ef158089a92526ed6982bc93481868a47fec14c47d1ce9a5bcc08979" content-hash = "d5a3a36e18fc130235b2610426058283487b25632c523c11aea97879cdda6172"
[metadata.files] [metadata.files]
alabaster = [ alabaster = [
@ -2294,10 +2313,7 @@ flake8 = [
flake8-bandit = [ flake8-bandit = [
{file = "flake8_bandit-2.1.2.tar.gz", hash = "sha256:687fc8da2e4a239b206af2e54a90093572a60d0954f3054e23690739b0b0de3b"}, {file = "flake8_bandit-2.1.2.tar.gz", hash = "sha256:687fc8da2e4a239b206af2e54a90093572a60d0954f3054e23690739b0b0de3b"},
] ]
flake8-bugbear = [ flake8-bugbear = []
{file = "flake8-bugbear-22.7.1.tar.gz", hash = "sha256:e450976a07e4f9d6c043d4f72b17ec1baf717fe37f7997009c8ae58064f88305"},
{file = "flake8_bugbear-22.7.1-py3-none-any.whl", hash = "sha256:db5d7a831ef4412a224b26c708967ff816818cabae415e76b8c58df156c4b8e5"},
]
flake8-docstrings = [ flake8-docstrings = [
{file = "flake8-docstrings-1.6.0.tar.gz", hash = "sha256:9fe7c6a306064af8e62a055c2f61e9eb1da55f84bb39caef2b84ce53708ac34b"}, {file = "flake8-docstrings-1.6.0.tar.gz", hash = "sha256:9fe7c6a306064af8e62a055c2f61e9eb1da55f84bb39caef2b84ce53708ac34b"},
{file = "flake8_docstrings-1.6.0-py2.py3-none-any.whl", hash = "sha256:99cac583d6c7e32dd28bbfbef120a7c0d1b6dde4adb5a9fd441c4227a6534bde"}, {file = "flake8_docstrings-1.6.0-py2.py3-none-any.whl", hash = "sha256:99cac583d6c7e32dd28bbfbef120a7c0d1b6dde4adb5a9fd441c4227a6534bde"},
@ -2526,79 +2542,7 @@ libcst = [
livereload = [ livereload = [
{file = "livereload-2.6.3.tar.gz", hash = "sha256:776f2f865e59fde56490a56bcc6773b6917366bce0c267c60ee8aaf1a0959869"}, {file = "livereload-2.6.3.tar.gz", hash = "sha256:776f2f865e59fde56490a56bcc6773b6917366bce0c267c60ee8aaf1a0959869"},
] ]
lxml = [ lxml = []
{file = "lxml-4.9.1-cp27-cp27m-macosx_10_15_x86_64.whl", hash = "sha256:98cafc618614d72b02185ac583c6f7796202062c41d2eeecdf07820bad3295ed"},
{file = "lxml-4.9.1-cp27-cp27m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c62e8dd9754b7debda0c5ba59d34509c4688f853588d75b53c3791983faa96fc"},
{file = "lxml-4.9.1-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:21fb3d24ab430fc538a96e9fbb9b150029914805d551deeac7d7822f64631dfc"},
{file = "lxml-4.9.1-cp27-cp27m-win32.whl", hash = "sha256:86e92728ef3fc842c50a5cb1d5ba2bc66db7da08a7af53fb3da79e202d1b2cd3"},
{file = "lxml-4.9.1-cp27-cp27m-win_amd64.whl", hash = "sha256:4cfbe42c686f33944e12f45a27d25a492cc0e43e1dc1da5d6a87cbcaf2e95627"},
{file = "lxml-4.9.1-cp27-cp27mu-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:dad7b164905d3e534883281c050180afcf1e230c3d4a54e8038aa5cfcf312b84"},
{file = "lxml-4.9.1-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:a614e4afed58c14254e67862456d212c4dcceebab2eaa44d627c2ca04bf86837"},
{file = "lxml-4.9.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:49a866923e69bc7da45a0565636243707c22752fc38f6b9d5c8428a86121022c"},
{file = "lxml-4.9.1-cp310-cp310-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:f9ced82717c7ec65a67667bb05865ffe38af0e835cdd78728f1209c8fffe0cad"},
{file = "lxml-4.9.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:d9fc0bf3ff86c17348dfc5d322f627d78273eba545db865c3cd14b3f19e57fa5"},
{file = "lxml-4.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:e5f66bdf0976ec667fc4594d2812a00b07ed14d1b44259d19a41ae3fff99f2b8"},
{file = "lxml-4.9.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:fe17d10b97fdf58155f858606bddb4e037b805a60ae023c009f760d8361a4eb8"},
{file = "lxml-4.9.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8caf4d16b31961e964c62194ea3e26a0e9561cdf72eecb1781458b67ec83423d"},
{file = "lxml-4.9.1-cp310-cp310-win32.whl", hash = "sha256:4780677767dd52b99f0af1f123bc2c22873d30b474aa0e2fc3fe5e02217687c7"},
{file = "lxml-4.9.1-cp310-cp310-win_amd64.whl", hash = "sha256:b122a188cd292c4d2fcd78d04f863b789ef43aa129b233d7c9004de08693728b"},
{file = "lxml-4.9.1-cp311-cp311-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:be9eb06489bc975c38706902cbc6888f39e946b81383abc2838d186f0e8b6a9d"},
{file = "lxml-4.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:f1be258c4d3dc609e654a1dc59d37b17d7fef05df912c01fc2e15eb43a9735f3"},
{file = "lxml-4.9.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:927a9dd016d6033bc12e0bf5dee1dde140235fc8d0d51099353c76081c03dc29"},
{file = "lxml-4.9.1-cp35-cp35m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:9232b09f5efee6a495a99ae6824881940d6447debe272ea400c02e3b68aad85d"},
{file = "lxml-4.9.1-cp35-cp35m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:04da965dfebb5dac2619cb90fcf93efdb35b3c6994fea58a157a834f2f94b318"},
{file = "lxml-4.9.1-cp35-cp35m-win32.whl", hash = "sha256:4d5bae0a37af799207140652a700f21a85946f107a199bcb06720b13a4f1f0b7"},
{file = "lxml-4.9.1-cp35-cp35m-win_amd64.whl", hash = "sha256:4878e667ebabe9b65e785ac8da4d48886fe81193a84bbe49f12acff8f7a383a4"},
{file = "lxml-4.9.1-cp36-cp36m-macosx_10_15_x86_64.whl", hash = "sha256:1355755b62c28950f9ce123c7a41460ed9743c699905cbe664a5bcc5c9c7c7fb"},
{file = "lxml-4.9.1-cp36-cp36m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:bcaa1c495ce623966d9fc8a187da80082334236a2a1c7e141763ffaf7a405067"},
{file = "lxml-4.9.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6eafc048ea3f1b3c136c71a86db393be36b5b3d9c87b1c25204e7d397cee9536"},
{file = "lxml-4.9.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:13c90064b224e10c14dcdf8086688d3f0e612db53766e7478d7754703295c7c8"},
{file = "lxml-4.9.1-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:206a51077773c6c5d2ce1991327cda719063a47adc02bd703c56a662cdb6c58b"},
{file = "lxml-4.9.1-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:e8f0c9d65da595cfe91713bc1222af9ecabd37971762cb830dea2fc3b3bb2acf"},
{file = "lxml-4.9.1-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:8f0a4d179c9a941eb80c3a63cdb495e539e064f8054230844dcf2fcb812b71d3"},
{file = "lxml-4.9.1-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:830c88747dce8a3e7525defa68afd742b4580df6aa2fdd6f0855481e3994d391"},
{file = "lxml-4.9.1-cp36-cp36m-win32.whl", hash = "sha256:1e1cf47774373777936c5aabad489fef7b1c087dcd1f426b621fda9dcc12994e"},
{file = "lxml-4.9.1-cp36-cp36m-win_amd64.whl", hash = "sha256:5974895115737a74a00b321e339b9c3f45c20275d226398ae79ac008d908bff7"},
{file = "lxml-4.9.1-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:1423631e3d51008871299525b541413c9b6c6423593e89f9c4cfbe8460afc0a2"},
{file = "lxml-4.9.1-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:2aaf6a0a6465d39b5ca69688fce82d20088c1838534982996ec46633dc7ad6cc"},
{file = "lxml-4.9.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:9f36de4cd0c262dd9927886cc2305aa3f2210db437aa4fed3fb4940b8bf4592c"},
{file = "lxml-4.9.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:ae06c1e4bc60ee076292e582a7512f304abdf6c70db59b56745cca1684f875a4"},
{file = "lxml-4.9.1-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:57e4d637258703d14171b54203fd6822fda218c6c2658a7d30816b10995f29f3"},
{file = "lxml-4.9.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:6d279033bf614953c3fc4a0aa9ac33a21e8044ca72d4fa8b9273fe75359d5cca"},
{file = "lxml-4.9.1-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:a60f90bba4c37962cbf210f0188ecca87daafdf60271f4c6948606e4dabf8785"},
{file = "lxml-4.9.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:6ca2264f341dd81e41f3fffecec6e446aa2121e0b8d026fb5130e02de1402785"},
{file = "lxml-4.9.1-cp37-cp37m-win32.whl", hash = "sha256:27e590352c76156f50f538dbcebd1925317a0f70540f7dc8c97d2931c595783a"},
{file = "lxml-4.9.1-cp37-cp37m-win_amd64.whl", hash = "sha256:eea5d6443b093e1545ad0210e6cf27f920482bfcf5c77cdc8596aec73523bb7e"},
{file = "lxml-4.9.1-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:f05251bbc2145349b8d0b77c0d4e5f3b228418807b1ee27cefb11f69ed3d233b"},
{file = "lxml-4.9.1-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:487c8e61d7acc50b8be82bda8c8d21d20e133c3cbf41bd8ad7eb1aaeb3f07c97"},
{file = "lxml-4.9.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:8d1a92d8e90b286d491e5626af53afef2ba04da33e82e30744795c71880eaa21"},
{file = "lxml-4.9.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:b570da8cd0012f4af9fa76a5635cd31f707473e65a5a335b186069d5c7121ff2"},
{file = "lxml-4.9.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:5ef87fca280fb15342726bd5f980f6faf8b84a5287fcc2d4962ea8af88b35130"},
{file = "lxml-4.9.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:93e414e3206779ef41e5ff2448067213febf260ba747fc65389a3ddaa3fb8715"},
{file = "lxml-4.9.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:6653071f4f9bac46fbc30f3c7838b0e9063ee335908c5d61fb7a4a86c8fd2036"},
{file = "lxml-4.9.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:32a73c53783becdb7eaf75a2a1525ea8e49379fb7248c3eeefb9412123536387"},
{file = "lxml-4.9.1-cp38-cp38-win32.whl", hash = "sha256:1a7c59c6ffd6ef5db362b798f350e24ab2cfa5700d53ac6681918f314a4d3b94"},
{file = "lxml-4.9.1-cp38-cp38-win_amd64.whl", hash = "sha256:1436cf0063bba7888e43f1ba8d58824f085410ea2025befe81150aceb123e345"},
{file = "lxml-4.9.1-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:4beea0f31491bc086991b97517b9683e5cfb369205dac0148ef685ac12a20a67"},
{file = "lxml-4.9.1-cp39-cp39-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:41fb58868b816c202e8881fd0f179a4644ce6e7cbbb248ef0283a34b73ec73bb"},
{file = "lxml-4.9.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:bd34f6d1810d9354dc7e35158aa6cc33456be7706df4420819af6ed966e85448"},
{file = "lxml-4.9.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:edffbe3c510d8f4bf8640e02ca019e48a9b72357318383ca60e3330c23aaffc7"},
{file = "lxml-4.9.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:6d949f53ad4fc7cf02c44d6678e7ff05ec5f5552b235b9e136bd52e9bf730b91"},
{file = "lxml-4.9.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:079b68f197c796e42aa80b1f739f058dcee796dc725cc9a1be0cdb08fc45b000"},
{file = "lxml-4.9.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:9c3a88d20e4fe4a2a4a84bf439a5ac9c9aba400b85244c63a1ab7088f85d9d25"},
{file = "lxml-4.9.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4e285b5f2bf321fc0857b491b5028c5f276ec0c873b985d58d7748ece1d770dd"},
{file = "lxml-4.9.1-cp39-cp39-win32.whl", hash = "sha256:ef72013e20dd5ba86a8ae1aed7f56f31d3374189aa8b433e7b12ad182c0d2dfb"},
{file = "lxml-4.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:10d2017f9150248563bb579cd0d07c61c58da85c922b780060dcc9a3aa9f432d"},
{file = "lxml-4.9.1-pp37-pypy37_pp73-macosx_10_15_x86_64.whl", hash = "sha256:0538747a9d7827ce3e16a8fdd201a99e661c7dee3c96c885d8ecba3c35d1032c"},
{file = "lxml-4.9.1-pp37-pypy37_pp73-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:0645e934e940107e2fdbe7c5b6fb8ec6232444260752598bc4d09511bd056c0b"},
{file = "lxml-4.9.1-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:6daa662aba22ef3258934105be2dd9afa5bb45748f4f702a3b39a5bf53a1f4dc"},
{file = "lxml-4.9.1-pp38-pypy38_pp73-macosx_10_15_x86_64.whl", hash = "sha256:603a464c2e67d8a546ddaa206d98e3246e5db05594b97db844c2f0a1af37cf5b"},
{file = "lxml-4.9.1-pp38-pypy38_pp73-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:c4b2e0559b68455c085fb0f6178e9752c4be3bba104d6e881eb5573b399d1eb2"},
{file = "lxml-4.9.1-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:0f3f0059891d3254c7b5fb935330d6db38d6519ecd238ca4fce93c234b4a0f73"},
{file = "lxml-4.9.1-pp39-pypy39_pp73-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_24_i686.whl", hash = "sha256:c852b1530083a620cb0de5f3cd6826f19862bafeaf77586f1aef326e49d95f0c"},
{file = "lxml-4.9.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:287605bede6bd36e930577c5925fcea17cb30453d96a7b4c63c14a257118dbb9"},
{file = "lxml-4.9.1.tar.gz", hash = "sha256:fe749b052bb7233fe5d072fcb549221a8cb1a16725c47c37e42b0b9cb3ff2c3f"},
]
mako = [ mako = [
{file = "Mako-1.2.0-py3-none-any.whl", hash = "sha256:23aab11fdbbb0f1051b93793a58323ff937e98e34aece1c4219675122e57e4ba"}, {file = "Mako-1.2.0-py3-none-any.whl", hash = "sha256:23aab11fdbbb0f1051b93793a58323ff937e98e34aece1c4219675122e57e4ba"},
{file = "Mako-1.2.0.tar.gz", hash = "sha256:9a7c7e922b87db3686210cf49d5d767033a41d4010b284e747682c92bddd8b39"}, {file = "Mako-1.2.0.tar.gz", hash = "sha256:9a7c7e922b87db3686210cf49d5d767033a41d4010b284e747682c92bddd8b39"},
@ -2784,7 +2728,21 @@ py = [
{file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"},
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
] ]
pyasn1 = [] pyasn1 = [
{file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
{file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
{file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
{file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
{file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
{file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
{file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
{file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
{file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
{file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
{file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
{file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
{file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
]
pycodestyle = [ pycodestyle = [
{file = "pycodestyle-2.8.0-py2.py3-none-any.whl", hash = "sha256:720f8b39dde8b293825e7ff02c475f3077124006db4f440dcbc9a20b76548a20"}, {file = "pycodestyle-2.8.0-py2.py3-none-any.whl", hash = "sha256:720f8b39dde8b293825e7ff02c475f3077124006db4f440dcbc9a20b76548a20"},
{file = "pycodestyle-2.8.0.tar.gz", hash = "sha256:eddd5847ef438ea1c7870ca7eb78a9d47ce0cdb4851a5523949f2601d0cbbe7f"}, {file = "pycodestyle-2.8.0.tar.gz", hash = "sha256:eddd5847ef438ea1c7870ca7eb78a9d47ce0cdb4851a5523949f2601d0cbbe7f"},
@ -2989,7 +2947,10 @@ requests = [
restructuredtext-lint = [ restructuredtext-lint = [
{file = "restructuredtext_lint-1.4.0.tar.gz", hash = "sha256:1b235c0c922341ab6c530390892eb9e92f90b9b75046063e047cacfb0f050c45"}, {file = "restructuredtext_lint-1.4.0.tar.gz", hash = "sha256:1b235c0c922341ab6c530390892eb9e92f90b9b75046063e047cacfb0f050c45"},
] ]
rsa = [] rsa = [
{file = "rsa-4.8-py3-none-any.whl", hash = "sha256:95c5d300c4e879ee69708c428ba566c59478fd653cc3a22243eeb8ed846950bb"},
{file = "rsa-4.8.tar.gz", hash = "sha256:5c6bd9dc7a543b7fe4304a631f8a8a3b674e2bbfc49c2ae96200cdbe55df6b17"},
]
"ruamel.yaml" = [ "ruamel.yaml" = [
{file = "ruamel.yaml-0.17.21-py3-none-any.whl", hash = "sha256:742b35d3d665023981bd6d16b3d24248ce5df75fdb4e2924e93a05c1f8b61ca7"}, {file = "ruamel.yaml-0.17.21-py3-none-any.whl", hash = "sha256:742b35d3d665023981bd6d16b3d24248ce5df75fdb4e2924e93a05c1f8b61ca7"},
{file = "ruamel.yaml-0.17.21.tar.gz", hash = "sha256:8b7ce697a2f212752a35c1ac414471dc16c424c9573be4926b56ff3f5d23b7af"}, {file = "ruamel.yaml-0.17.21.tar.gz", hash = "sha256:8b7ce697a2f212752a35c1ac414471dc16c424c9573be4926b56ff3f5d23b7af"},
@ -3058,10 +3019,7 @@ sphinx-basic-ng = [
{file = "sphinx_basic_ng-0.0.1a11-py3-none-any.whl", hash = "sha256:9aecb5345816998789ef76658a83e3c0a12aafa14b17d40e28cd4aaeb94d1517"}, {file = "sphinx_basic_ng-0.0.1a11-py3-none-any.whl", hash = "sha256:9aecb5345816998789ef76658a83e3c0a12aafa14b17d40e28cd4aaeb94d1517"},
{file = "sphinx_basic_ng-0.0.1a11.tar.gz", hash = "sha256:bf9a8fda0379c7d2ab51c9543f2b18e014b77fb295b49d64f3c1a910c863b34f"}, {file = "sphinx_basic_ng-0.0.1a11.tar.gz", hash = "sha256:bf9a8fda0379c7d2ab51c9543f2b18e014b77fb295b49d64f3c1a910c863b34f"},
] ]
sphinx-click = [ sphinx-click = []
{file = "sphinx-click-4.3.0.tar.gz", hash = "sha256:bd4db5d3c1bec345f07af07b8e28a76cfc5006d997984e38ae246bbf8b9a3b38"},
{file = "sphinx_click-4.3.0-py3-none-any.whl", hash = "sha256:23e85a3cb0b728a421ea773699f6acadefae171d1a764a51dd8ec5981503ccbe"},
]
sphinxcontrib-applehelp = [ sphinxcontrib-applehelp = [
{file = "sphinxcontrib-applehelp-1.0.2.tar.gz", hash = "sha256:a072735ec80e7675e3f432fcae8610ecf509c5f1869d17e2eecff44389cdbc58"}, {file = "sphinxcontrib-applehelp-1.0.2.tar.gz", hash = "sha256:a072735ec80e7675e3f432fcae8610ecf509c5f1869d17e2eecff44389cdbc58"},
{file = "sphinxcontrib_applehelp-1.0.2-py2.py3-none-any.whl", hash = "sha256:806111e5e962be97c29ec4c1e7fe277bfd19e9652fb1a4392105b43e01af885a"}, {file = "sphinxcontrib_applehelp-1.0.2-py2.py3-none-any.whl", hash = "sha256:806111e5e962be97c29ec4c1e7fe277bfd19e9652fb1a4392105b43e01af885a"},
@ -3200,10 +3158,9 @@ types-pytz = [
{file = "types-pytz-2022.1.1.tar.gz", hash = "sha256:4e7add70886dc2ee6ee7535c8184a26eeb0ac9dbafae9962cb882d74b9f67330"}, {file = "types-pytz-2022.1.1.tar.gz", hash = "sha256:4e7add70886dc2ee6ee7535c8184a26eeb0ac9dbafae9962cb882d74b9f67330"},
{file = "types_pytz-2022.1.1-py3-none-any.whl", hash = "sha256:581467742f32f15fff1098698b11fd511057a2a8a7568d33b604083f2b03c24f"}, {file = "types_pytz-2022.1.1-py3-none-any.whl", hash = "sha256:581467742f32f15fff1098698b11fd511057a2a8a7568d33b604083f2b03c24f"},
] ]
typing-extensions = [ types-requests = []
{file = "typing_extensions-4.3.0-py3-none-any.whl", hash = "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02"}, types-urllib3 = []
{file = "typing_extensions-4.3.0.tar.gz", hash = "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"}, typing-extensions = []
]
typing-inspect = [ typing-inspect = [
{file = "typing_inspect-0.7.1-py2-none-any.whl", hash = "sha256:b1f56c0783ef0f25fb064a01be6e5407e54cf4a4bf4f3ba3fe51e0bd6dcea9e5"}, {file = "typing_inspect-0.7.1-py2-none-any.whl", hash = "sha256:b1f56c0783ef0f25fb064a01be6e5407e54cf4a4bf4f3ba3fe51e0bd6dcea9e5"},
{file = "typing_inspect-0.7.1-py3-none-any.whl", hash = "sha256:3cd7d4563e997719a710a3bfe7ffb544c6b72069b6812a02e9b414a8fa3aaa6b"}, {file = "typing_inspect-0.7.1-py3-none-any.whl", hash = "sha256:3cd7d4563e997719a710a3bfe7ffb544c6b72069b6812a02e9b414a8fa3aaa6b"},

View File

@ -47,6 +47,7 @@ PyJWT = "^2.4.0"
gunicorn = "^20.1.0" gunicorn = "^20.1.0"
types-pytz = "^2022.1.1" types-pytz = "^2022.1.1"
python-keycloak = "^1.9.1" python-keycloak = "^1.9.1"
types-requests = "^2.28.6"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]

View File

@ -10,6 +10,7 @@ from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
from typing import Any
class UserModel(SpiffworkflowBaseDBModel): class UserModel(SpiffworkflowBaseDBModel):
"""UserModel.""" """UserModel."""
@ -62,16 +63,16 @@ class UserModel(SpiffworkflowBaseDBModel):
"""Is_admin.""" """Is_admin."""
return True return True
@classmethod # @classmethod
def from_open_id_user_info(cls, user_info): # def from_open_id_user_info(cls, user_info: dict) -> Any:
"""From_open_id_user_info.""" # """From_open_id_user_info."""
instance = cls() # instance = cls()
instance.service = "keycloak" # instance.service = "keycloak"
instance.service_id = user_info["sub"] # instance.service_id = user_info["sub"]
instance.name = user_info["preferred_username"] # instance.name = user_info["preferred_username"]
instance.username = user_info["sub"] # instance.username = user_info["sub"]
#
return instance # return instance
class UserModelSchema(Schema): class UserModelSchema(Schema):

View File

@ -1,7 +1,7 @@
"""User.""" """User."""
import ast import ast
import base64 import base64
from typing import Dict from typing import Any, Dict
from typing import Optional from typing import Optional
import jwt import jwt
@ -17,6 +17,8 @@ from spiffworkflow_backend.services.authentication_service import (
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
from werkzeug.wrappers.response import Response
""" """
.. module:: crc.api.user .. module:: crc.api.user
:synopsis: Single Sign On (SSO) user login and session handlers :synopsis: Single Sign On (SSO) user login and session handlers
@ -42,56 +44,58 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
user_model = None user_model = None
decoded_token = get_decoded_token(token) decoded_token = get_decoded_token(token)
if "token_type" in decoded_token: if decoded_token is not None:
token_type = decoded_token["token_type"]
if token_type == "internal": # noqa: S105 if "token_type" in decoded_token:
token_type = decoded_token["token_type"]
if token_type == "internal": # noqa: S105
try:
user_model = get_user_from_decoded_internal_token(decoded_token)
except Exception as e:
current_app.logger.error(
f"Exception in verify_token getting user from decoded internal token. {e}"
)
elif "iss" in decoded_token.keys():
try: try:
user_model = get_user_from_decoded_internal_token(decoded_token) user_info = AuthorizationService().get_user_info_from_id_token(token)
except ApiError as ae:
raise ae
except Exception as e: except Exception as e:
current_app.logger.error( current_app.logger.error(f"Exception raised in get_token: {e}")
f"Exception in verify_token getting user from decoded internal token. {e}" raise ApiError(
) code="fail_get_user_info", message="Cannot get user info from token"
) from e
elif "iss" in decoded_token: if (
try: user_info is not None and "error" not in user_info
user_info = AuthorizationService().get_user_info_from_id_token(token) ): # not sure what to test yet
except ApiError as ae: user_model = (
raise ae UserModel.query.filter(UserModel.service == "keycloak")
except Exception as e: .filter(UserModel.service_id == user_info["sub"])
current_app.logger.error(f"Exception raised in get_token: {e}") .first()
raise ApiError(
code="fail_get_user_info", message="Cannot get user info from token"
) from e
if (
user_info is not None and "error" not in user_info
): # not sure what to test yet
user_model = (
UserModel.query.filter(UserModel.service == "keycloak")
.filter(UserModel.service_id == user_info["sub"])
.first()
)
if user_model is None:
# Do we ever get here any more, now that we have login_return method?
current_app.logger.debug("create_user in verify_token")
user_model = UserService().create_user(
service="keycloak",
service_id=user_info["sub"],
name=user_info["name"],
username=user_info["preferred_username"],
email=user_info["email"],
) )
# no user_info if user_model is None:
# Do we ever get here any more, now that we have login_return method?
current_app.logger.debug("create_user in verify_token")
user_model = UserService().create_user(
service="keycloak",
service_id=user_info["sub"],
name=user_info["name"],
username=user_info["preferred_username"],
email=user_info["email"],
)
# no user_info
else:
raise ApiError(code="no_user_info", message="Cannot retrieve user info")
else: else:
raise ApiError(code="no_user_info", message="Cannot retrieve user info") current_app.logger.debug("token_type not in decode_token in verify_token")
raise ApiError(
else: code="invalid_token",
current_app.logger.debug("token_type not in decode_token in verify_token") message="Invalid token. Please log in.",
raise ApiError( status_code=401,
code="invalid_token", )
message="Invalid token. Please log in.",
status_code=401,
)
if user_model: if user_model:
g.user = user_model g.user = user_model
@ -105,29 +109,34 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
else: else:
raise ApiError(code="no_user_id", message="Cannot get a user id") raise ApiError(code="no_user_id", message="Cannot get a user id")
raise ApiError(code="invalid_token",
message="Cannot validate token.",
status_code=401
)
# no token -- do we ever get here? # no token -- do we ever get here?
else: # else:
if current_app.config.get("DEVELOPMENT"): # ...
# Fall back to a default user if this is not production. # if current_app.config.get("DEVELOPMENT"):
g.user = UserModel.query.first() # # Fall back to a default user if this is not production.
if not g.user: # g.user = UserModel.query.first()
raise ApiError( # if not g.user:
"no_user", # raise ApiError(
"You are in development mode, but there are no users in the database. Add one, and it will use it.", # "no_user",
) # "You are in development mode, but there are no users in the database. Add one, and it will use it.",
token_from_user = g.user.encode_auth_token() # )
token_info = UserModel.decode_auth_token(token_from_user) # token_from_user = g.user.encode_auth_token()
return token_info # token_info = UserModel.decode_auth_token(token_from_user)
# return token_info
else: #
raise ApiError( # else:
code="no_auth_token", # raise ApiError(
message="No authorization token was available.", # code="no_auth_token",
status_code=401, # message="No authorization token was available.",
) # status_code=401,
# )
def validate_scope(token) -> bool: def validate_scope(token: Any) -> bool:
"""Validate_scope.""" """Validate_scope."""
print("validate_scope") print("validate_scope")
# token = AuthorizationService().refresh_token(token) # token = AuthorizationService().refresh_token(token)
@ -139,35 +148,42 @@ def validate_scope(token) -> bool:
return True return True
def api_login(uid, password, redirect_url=None): def api_login(uid: str, password: str, redirect_url: str | None=None) -> dict:
"""Api_login.""" """Api_login."""
# TODO: Fix this! mac 20220801
token = PublicAuthenticationService().get_public_access_token(uid, password) token = PublicAuthenticationService().get_public_access_token(uid, password)
g.token = token g.token = token
return token return token
def encode_auth_token(uid): def encode_auth_token(uid: str) -> str:
"""Generates the Auth Token. """Generates the Auth Token.
:return: string :return: string
""" """
payload = {"sub": uid} payload = {"sub": uid}
if "SECRET_KEY" in current_app.config:
secret_key = current_app.config.get("SECRET_KEY")
else:
current_app.logger.error("Missing SECRET_KEY in encode_auth_token")
raise ApiError(code="encode_error",
message="Missing SECRET_KEY in encode_auth_token")
return jwt.encode( return jwt.encode(
payload, payload,
current_app.config.get("SECRET_KEY"), str(secret_key),
algorithm="HS256", algorithm="HS256",
) )
def login(redirect_url="/"): def login(redirect_url: str="/") -> Response:
"""Login.""" """Login."""
state = PublicAuthenticationService.generate_state(redirect_url) state = PublicAuthenticationService.generate_state(redirect_url)
login_redirect_url = PublicAuthenticationService().get_login_redirect_url(state) login_redirect_url = PublicAuthenticationService().get_login_redirect_url(state)
return redirect(login_redirect_url) return redirect(login_redirect_url)
def login_return(code, state, session_state): def login_return(code: str, state: str, session_state: str) -> Response | None:
"""Login_return.""" """Login_return."""
state_dict = ast.literal_eval( state_dict = ast.literal_eval(
base64.b64decode(ast.literal_eval(state)).decode("utf-8") base64.b64decode(ast.literal_eval(state)).decode("utf-8")
@ -213,18 +229,20 @@ def login_return(code, state, session_state):
+ f"id_token={id_token}" + f"id_token={id_token}"
) )
return redirect(redirect_url) return redirect(redirect_url)
raise ApiError(code="invalid_login",
message="Login failed. Please try again",
status_code=401)
# return f"{code} {state} {id_token}" def logout(id_token: str, redirect_url: str | None) -> Response:
def logout(id_token: str, redirect_url: str | None):
"""Logout.""" """Logout."""
if redirect_url is None:
redirect_url = ''
return PublicAuthenticationService().logout( return PublicAuthenticationService().logout(
id_token=id_token, redirect_url=redirect_url redirect_url=redirect_url, id_token=id_token
) )
def logout_return(): def logout_return() -> Response:
"""Logout_return.""" """Logout_return."""
return redirect("http://localhost:7001/") return redirect("http://localhost:7001/")
@ -257,7 +275,7 @@ def get_decoded_token(token: str) -> Dict | None:
# return True # return True
def get_scope(token): def get_scope(token: str) -> str:
"""Get_scope.""" """Get_scope."""
scope = "" scope = ""
decoded_token = jwt.decode(token, options={"verify_signature": False}) decoded_token = jwt.decode(token, options={"verify_signature": False})
@ -266,13 +284,13 @@ def get_scope(token):
return scope return scope
def get_user_from_decoded_internal_token(decoded_token): def get_user_from_decoded_internal_token(decoded_token: dict) -> UserModel | None:
"""Get_user_from_decoded_internal_token.""" """Get_user_from_decoded_internal_token."""
sub = decoded_token["sub"] sub = decoded_token["sub"]
parts = sub.split("::") parts = sub.split("::")
service = parts[0].split(":")[1] service = parts[0].split(":")[1]
service_id = parts[1].split(":")[1] service_id = parts[1].split(":")[1]
user = ( user: UserModel = (
UserModel.query.filter(UserModel.service == service) UserModel.query.filter(UserModel.service == service)
.filter(UserModel.service_id == service_id) .filter(UserModel.service_id == service_id)
.first() .first()

View File

@ -11,12 +11,13 @@ from flask import current_app
from flask import redirect from flask import redirect
from flask_bpmn.api.api_error import ApiError from flask_bpmn.api.api_error import ApiError
from keycloak import KeycloakOpenID # type: ignore from keycloak import KeycloakOpenID # type: ignore
from keycloak.uma_permissions import AuthStatus # noqa: F401 # from keycloak.uma_permissions import AuthStatus # noqa: F401
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from werkzeug.wrappers.response import Response
def get_keycloak_args(): def get_keycloak_args() -> tuple:
"""Get_keycloak_args.""" """Get_keycloak_args."""
keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"] keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"]
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"] keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
@ -47,13 +48,10 @@ class PublicAuthenticationService:
Used during development to make testing easy. Used during development to make testing easy.
""" """
def logout(self, redirect_url: str = "/", id_token: str | None = None): def logout(self, id_token: str , redirect_url: str | None = None) -> Response:
"""Logout.""" """Logout."""
if id_token is None: if redirect_url is None:
raise ApiError( redirect_url = '/'
code="missing_id_token", message="id_token is missing", status_code=400
)
return_redirect_url = "http://localhost:7000/v1.0/logout_return" return_redirect_url = "http://localhost:7000/v1.0/logout_return"
( (
keycloak_server_url, keycloak_server_url,
@ -70,12 +68,12 @@ class PublicAuthenticationService:
return redirect(request_url) return redirect(request_url)
@staticmethod @staticmethod
def generate_state(redirect_url): def generate_state(redirect_url: str) -> bytes:
"""Generate_state.""" """Generate_state."""
state = base64.b64encode(bytes(str({"redirect_url": redirect_url}), "UTF-8")) state = base64.b64encode(bytes(str({"redirect_url": redirect_url}), "UTF-8"))
return state return state
def get_login_redirect_url(self, state): def get_login_redirect_url(self, state: bytes) -> str:
"""Get_login_redirect_url.""" """Get_login_redirect_url."""
( (
keycloak_server_url, keycloak_server_url,
@ -86,7 +84,7 @@ class PublicAuthenticationService:
return_redirect_url = "http://localhost:7000/v1.0/login_return" return_redirect_url = "http://localhost:7000/v1.0/login_return"
login_redirect_url = ( login_redirect_url = (
f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/auth?" f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/auth?"
+ f"state={state}&" + f"state={state!r}&"
+ "response_type=code&" + "response_type=code&"
+ f"client_id={keycloak_client_id}&" + f"client_id={keycloak_client_id}&"
+ "scope=openid&" + "scope=openid&"
@ -94,7 +92,7 @@ class PublicAuthenticationService:
) )
return login_redirect_url return login_redirect_url
def get_id_token_object(self, code): def get_id_token_object(self, code: str) -> dict:
"""Get_id_token_object.""" """Get_id_token_object."""
( (
keycloak_server_url, keycloak_server_url,
@ -120,11 +118,11 @@ class PublicAuthenticationService:
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token" request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
response = requests.post(request_url, data=data, headers=headers) response = requests.post(request_url, data=data, headers=headers)
id_token_object = json.loads(response.text) id_token_object: dict = json.loads(response.text)
return id_token_object return id_token_object
@staticmethod @staticmethod
def validate_id_token(id_token): def validate_id_token(id_token: str) -> bool:
"""Https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation.""" """Https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation."""
valid = True valid = True
now = time.time() now = time.time()
@ -171,7 +169,7 @@ class PublicAuthenticationService:
return True return True
def get_public_access_token(self, username, password) -> dict: def get_public_access_token(self, username: str, password: str) -> dict:
"""Get_public_access_token.""" """Get_public_access_token."""
( (
keycloak_server_url, keycloak_server_url,
@ -192,7 +190,8 @@ class PublicAuthenticationService:
if public_response.status_code == 200: if public_response.status_code == 200:
public_token = json.loads(public_response.text) public_token = json.loads(public_response.text)
if "access_token" in public_token: if "access_token" in public_token:
return public_token["access_token"] access_token: dict = public_token['access_token']
return access_token
raise ApiError( raise ApiError(
code="no_public_access_token", code="no_public_access_token",
message=f"We could not get a public access token: {username}", message=f"We could not get a public access token: {username}",

View File

@ -13,7 +13,7 @@ class AuthorizationService:
"""Determine whether a user has permission to perform their request.""" """Determine whether a user has permission to perform their request."""
@staticmethod @staticmethod
def get_keycloak_args(): def get_keycloak_args() -> tuple:
"""Get_keycloak_args.""" """Get_keycloak_args."""
keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"] keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"]
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"] keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
@ -28,9 +28,8 @@ class AuthorizationService:
keycloak_client_secret_key, keycloak_client_secret_key,
) )
def get_user_info_from_id_token(self, token): def get_user_info_from_id_token(self, token: str) -> dict:
"""This seems to work with basic tokens too.""" """This seems to work with basic tokens too."""
json_data = None
( (
keycloak_server_url, keycloak_server_url,
keycloak_client_id, keycloak_client_id,
@ -38,69 +37,56 @@ class AuthorizationService:
keycloak_client_secret_key, keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args() ) = AuthorizationService.get_keycloak_args()
backend_basic_auth_string = f"{keycloak_client_id}:{keycloak_client_secret_key}" # backend_basic_auth_string = f"{keycloak_client_id}:{keycloak_client_secret_key}"
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii") # backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
base64.b64encode(backend_basic_auth_bytes) # backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
# headers = {"Content-Type": "application/x-www-form-urlencoded",
# "Authorization": f"Bearer {backend_basic_auth.decode('utf-8')}"}
# data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
# 'client_id': keycloak_client_id,
# "subject_token": basic_token,
# "audience": keycloak_client_id}
# bearer_token = self.get_bearer_token(public_access_token)
# if 'error' in bearer_token:
# raise ApiError(code='invalid_token',
# message="Invalid token. Please authenticate.")
# else:
# auth_bearer_string = f"Bearer {id_token_object['access_token']}"
# auth_bearer_string = f"Basic {keycloak_client_secret_key}"
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
# data = {
# "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
# "client_id": keycloak_client_id,
# # "subject_token": id_token_object['access_token'],
# "subject_token": token,
# "audience": keycloak_client_id,
# }
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/userinfo" request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/userinfo"
try: try:
request_response = requests.get(request_url, headers=headers) request_response = requests.get(request_url, headers=headers)
except Exception as e: except Exception as e:
print(f"get_user_from_token: Exception: {e}") current_app.logger.error(f"Exception in get_user_info_from_id_token: {e}")
if request_response.status_code == 200: raise ApiError(code='token_error',
json_data = json.loads(request_response.text) message=f"Exception in get_user_info_from_id_token: {e}",
elif request_response.status_code == 401: status_code=401)
if request_response.status_code == 401:
raise ApiError( raise ApiError(
code="invalid_token", message="Please login", status_code=401 code="invalid_token", message="Please login", status_code=401
) )
return json_data elif request_response.status_code == 200:
user_info: dict = json.loads(request_response.text)
return user_info
def refresh_token(self, token): raise ApiError(code='user_info_error',
"""Refresh_token.""" message=f"Cannot get user info in get_user_info_from_id_token",
# if isinstance(token, str): status_code=401)
# token = eval(token)
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
data = {
"grant_type": "refresh_token",
"client_id": "spiffworkflow-frontend",
"subject_token": token,
"refresh_token": token,
}
refresh_response = requests.post(request_url, headers=headers, data=data)
refresh_token = json.loads(refresh_response.text)
return refresh_token
def get_bearer_token(self, basic_token): # def refresh_token(self, token: str) -> str:
# """Refresh_token."""
# # if isinstance(token, str):
# # token = eval(token)
# (
# keycloak_server_url,
# keycloak_client_id,
# keycloak_realm_name,
# keycloak_client_secret_key,
# ) = AuthorizationService.get_keycloak_args()
# headers = {"Content-Type": "application/x-www-form-urlencoded"}
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
# data = {
# "grant_type": "refresh_token",
# "client_id": "spiffworkflow-frontend",
# "subject_token": token,
# "refresh_token": token,
# }
# refresh_response = requests.post(request_url, headers=headers, data=data)
# refresh_token = json.loads(refresh_response.text)
# return refresh_token
def get_bearer_token(self, basic_token: str) -> dict:
"""Get_bearer_token.""" """Get_bearer_token."""
( (
keycloak_server_url, keycloak_server_url,
@ -128,7 +114,7 @@ class AuthorizationService:
backend_response = requests.post(request_url, headers=headers, data=data) backend_response = requests.post(request_url, headers=headers, data=data)
# json_data = json.loads(backend_response.text) # json_data = json.loads(backend_response.text)
# bearer_token = json_data['access_token'] # bearer_token = json_data['access_token']
bearer_token = json.loads(backend_response.text) bearer_token: dict = json.loads(backend_response.text)
return bearer_token return bearer_token
@staticmethod @staticmethod
@ -156,143 +142,143 @@ class AuthorizationService:
"The Authentication token you provided is invalid. You need a new token. ", "The Authentication token you provided is invalid. You need a new token. ",
) from exception ) from exception
def get_bearer_token_from_internal_token(self, internal_token): # def get_bearer_token_from_internal_token(self, internal_token):
"""Get_bearer_token_from_internal_token.""" # """Get_bearer_token_from_internal_token."""
self.decode_auth_token(internal_token) # self.decode_auth_token(internal_token)
print(f"get_user_by_internal_token: {internal_token}") # print(f"get_user_by_internal_token: {internal_token}")
def introspect_token(self, basic_token): # def introspect_token(self, basic_token: str) -> dict:
"""Introspect_token.""" # """Introspect_token."""
( # (
keycloak_server_url, # keycloak_server_url,
keycloak_client_id, # keycloak_client_id,
keycloak_realm_name, # keycloak_realm_name,
keycloak_client_secret_key, # keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args() # ) = AuthorizationService.get_keycloak_args()
#
# bearer_token = AuthorizationService().get_bearer_token(basic_token)
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
#
# headers = {
# "Content-Type": "application/x-www-form-urlencoded",
# "Authorization": auth_bearer_string,
# }
# data = {
# "client_id": keycloak_client_id,
# "client_secret": keycloak_client_secret_key,
# "token": basic_token,
# }
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token/introspect"
#
# introspect_response = requests.post(request_url, headers=headers, data=data)
# introspection = json.loads(introspect_response.text)
#
# return introspection
bearer_token = AuthorizationService().get_bearer_token(basic_token) # def get_permission_by_basic_token(self, basic_token: dict) -> list:
auth_bearer_string = f"Bearer {bearer_token['access_token']}" # """Get_permission_by_basic_token."""
# (
# keycloak_server_url,
# keycloak_client_id,
# keycloak_realm_name,
# keycloak_client_secret_key,
# ) = AuthorizationService.get_keycloak_args()
#
# # basic_token = AuthorizationService().refresh_token(basic_token)
# # bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
# bearer_token = AuthorizationService().get_bearer_token(basic_token)
# # auth_bearer_string = f"Bearer {bearer_token['access_token']}"
# auth_bearer_string = f"Bearer {bearer_token}"
#
# headers = {
# "Content-Type": "application/x-www-form-urlencoded",
# "Authorization": auth_bearer_string,
# }
# data = {
# "client_id": keycloak_client_id,
# "client_secret": keycloak_client_secret_key,
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
# "response_mode": "permissions",
# "audience": keycloak_client_id,
# "response_include_resource_name": True,
# }
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
# permission_response = requests.post(request_url, headers=headers, data=data)
# permission = json.loads(permission_response.text)
# return permission
headers = { # def get_auth_status_for_resource_and_scope_by_token(
"Content-Type": "application/x-www-form-urlencoded", # self, basic_token: dict, resource: str, scope: str
"Authorization": auth_bearer_string, # ) -> str:
} # """Get_auth_status_for_resource_and_scope_by_token."""
data = { # (
"client_id": keycloak_client_id, # keycloak_server_url,
"client_secret": keycloak_client_secret_key, # keycloak_client_id,
"token": basic_token, # keycloak_realm_name,
} # keycloak_client_secret_key,
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token/introspect" # ) = AuthorizationService.get_keycloak_args()
#
# # basic_token = AuthorizationService().refresh_token(basic_token)
# bearer_token = AuthorizationService().get_bearer_token(basic_token)
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
#
# headers = {
# "Content-Type": "application/x-www-form-urlencoded",
# "Authorization": auth_bearer_string,
# }
# data = {
# "client_id": keycloak_client_id,
# "client_secret": keycloak_client_secret_key,
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
# "permission": f"{resource}#{scope}",
# "response_mode": "permissions",
# "audience": keycloak_client_id,
# }
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
# auth_response = requests.post(request_url, headers=headers, data=data)
#
# print("get_auth_status_for_resource_and_scope_by_token")
# auth_status: str = json.loads(auth_response.text)
# return auth_status
introspect_response = requests.post(request_url, headers=headers, data=data) # def get_permissions_by_token_for_resource_and_scope(
introspection = json.loads(introspect_response.text) # self, basic_token: str, resource: str|None=None, scope: str|None=None
# ) -> str:
return introspection # """Get_permissions_by_token_for_resource_and_scope."""
# (
def get_permission_by_basic_token(self, basic_token): # keycloak_server_url,
"""Get_permission_by_basic_token.""" # keycloak_client_id,
( # keycloak_realm_name,
keycloak_server_url, # keycloak_client_secret_key,
keycloak_client_id, # ) = AuthorizationService.get_keycloak_args()
keycloak_realm_name, #
keycloak_client_secret_key, # # basic_token = AuthorizationService().refresh_token(basic_token)
) = AuthorizationService.get_keycloak_args() # # bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
# bearer_token = AuthorizationService().get_bearer_token(basic_token)
# basic_token = AuthorizationService().refresh_token(basic_token) # auth_bearer_string = f"Bearer {bearer_token['access_token']}"
# bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token']) #
bearer_token = AuthorizationService().get_bearer_token(basic_token) # headers = {
# auth_bearer_string = f"Bearer {bearer_token['access_token']}" # "Content-Type": "application/x-www-form-urlencoded",
auth_bearer_string = f"Bearer {bearer_token}" # "Authorization": auth_bearer_string,
# }
headers = { # permision = ""
"Content-Type": "application/x-www-form-urlencoded", # if resource is not None and resource != '':
"Authorization": auth_bearer_string, # permision += resource
} # if scope is not None and scope != '':
data = { # permision += "#" + scope
"client_id": keycloak_client_id, # data = {
"client_secret": keycloak_client_secret_key, # "client_id": keycloak_client_id,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", # "client_secret": keycloak_client_secret_key,
"response_mode": "permissions", # "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"audience": keycloak_client_id, # "response_mode": "permissions",
"response_include_resource_name": True, # "permission": permision,
} # "audience": keycloak_client_id,
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token" # "response_include_resource_name": True,
permission_response = requests.post(request_url, headers=headers, data=data) # }
permission = json.loads(permission_response.text) # request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
return permission # permission_response = requests.post(request_url, headers=headers, data=data)
# permission: str = json.loads(permission_response.text)
def get_auth_status_for_resource_and_scope_by_token( # return permission
self, basic_token, resource, scope
):
"""Get_auth_status_for_resource_and_scope_by_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
data = {
"client_id": keycloak_client_id,
"client_secret": keycloak_client_secret_key,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": f"{resource}#{scope}",
"response_mode": "permissions",
"audience": keycloak_client_id,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
auth_response = requests.post(request_url, headers=headers, data=data)
print("get_auth_status_for_resource_and_scope_by_token")
auth_status = json.loads(auth_response.text)
return auth_status
def get_permissions_by_token_for_resource_and_scope(
self, basic_token, resource=None, scope=None
):
"""Get_permissions_by_token_for_resource_and_scope."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
# bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
permision = ""
if resource:
permision += resource
if scope:
permision += "#" + resource
data = {
"client_id": keycloak_client_id,
"client_secret": keycloak_client_secret_key,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"response_mode": "permissions",
"permission": permision,
"audience": keycloak_client_id,
"response_include_resource_name": True,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)
return permission
# def get_resource_set(self, public_access_token, uri): # def get_resource_set(self, public_access_token, uri):
# """Get_resource_set.""" # """Get_resource_set."""
@ -322,8 +308,9 @@ class AuthorizationService:
# #
# print("get_resource_set") # print("get_resource_set")
def get_permission_by_token(self, public_access_token) -> dict: def get_permission_by_token(self, public_access_token: str) -> dict:
"""Get_permission_by_token.""" """Get_permission_by_token."""
# TODO: Write a test for this
( (
keycloak_server_url, keycloak_server_url,
keycloak_client_id, keycloak_client_id,
@ -342,7 +329,7 @@ class AuthorizationService:
} }
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token" request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data) permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text) permission: dict = json.loads(permission_response.text)
return permission return permission

View File

@ -16,21 +16,44 @@ from spiffworkflow_backend.models.user import UserModel
class UserService: class UserService:
"""Provides common tools for working with users.""" """Provides common tools for working with users."""
def create_user(self, service, service_id, name=None, username=None, email=None): def create_user(self, service: str, service_id: str, name: str|None=None, username: str|None=None, email: str|None=None) -> UserModel:
"""Create_user.""" """Create_user."""
user = ( user_model: UserModel|None = (
UserModel.query.filter(UserModel.service == service) UserModel.query.filter(UserModel.service == service)
.filter(UserModel.service_id == service_id) .filter(UserModel.service_id == service_id)
.first() .first()
) )
if name is None: if user_model is None:
name = "" if name is None:
if username is None: name = ""
username = "" if username is None:
if email is None: username = ""
email = "" if email is None:
email = ""
if user is not None: user_model = UserModel(
username=username,
service=service,
service_id=service_id,
name=name,
email=email,
)
db.session.add(user_model)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
raise ApiError(
code="add_user_error", message=f"Could not add user {username}"
) from e
self.create_principal(user_model.id)
return user_model
else:
# TODO: username may be ''.
# not sure what to send in error message.
# Don't really want to send service_id.
raise ( raise (
ApiError( ApiError(
code="user_already_exists", code="user_already_exists",
@ -39,36 +62,6 @@ class UserService:
) )
) )
user = UserModel(
username=username,
service=service,
service_id=service_id,
name=name,
email=email,
)
try:
db.session.add(user)
except IntegrityError as exception:
raise (
ApiError(
code="integrity_error", message=repr(exception), status_code=500
)
) from exception
try:
db.session.commit()
except Exception as e:
db.session.rollback()
raise ApiError(
code="add_user_error", message=f"Could not add user {username}"
) from e
try:
self.create_principal(user.id)
except ApiError as ae:
# TODO: What is the right way to do this
raise ae
return user
# Returns true if the current user is logged in. # Returns true if the current user is logged in.
@staticmethod @staticmethod
def has_user() -> bool: def has_user() -> bool:
@ -243,9 +236,9 @@ class UserService:
message=f"No principal was found for user_id: {user_id}", message=f"No principal was found for user_id: {user_id}",
) )
def create_principal(self, user_id): def create_principal(self, user_id: int) -> PrincipalModel:
"""Create_principal.""" """Create_principal."""
principal = PrincipalModel.query.filter_by(user_id=user_id).first() principal: PrincipalModel | None = PrincipalModel.query.filter_by(user_id=user_id).first()
if principal is None: if principal is None:
principal = PrincipalModel(user_id=user_id) principal = PrincipalModel(user_id=user_id)
db.session.add(principal) db.session.add(principal)

View File

@ -27,7 +27,7 @@ class BaseTest:
) )
@staticmethod @staticmethod
def get_public_access_token(username, password) -> dict: def get_public_access_token(username: str, password: str) -> dict:
"""Get_public_access_token.""" """Get_public_access_token."""
public_access_token = PublicAuthenticationService().get_public_access_token( public_access_token = PublicAuthenticationService().get_public_access_token(
username, password username, password

View File

@ -8,162 +8,155 @@ from spiffworkflow_backend.services.authorization_service import AuthorizationSe
class TestAuthorization(BaseTest): class TestAuthorization(BaseTest):
"""TestAuthorization.""" """TestAuthorization."""
def test_get_bearer_token(self, app: Flask) -> None: # def test_get_bearer_token(self, app: Flask) -> None:
"""Test_get_bearer_token.""" # """Test_get_bearer_token."""
for user_id in ("user_1", "user_2", "admin_1", "admin_2"): # for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
public_access_token = self.get_public_access_token(user_id, user_id) # public_access_token = self.get_public_access_token(user_id, user_id)
bearer_token = AuthorizationService().get_bearer_token(public_access_token) # bearer_token = AuthorizationService().get_bearer_token(public_access_token)
assert isinstance(public_access_token, str) # assert isinstance(public_access_token, str)
assert isinstance(bearer_token, dict) # assert isinstance(bearer_token, dict)
assert "access_token" in bearer_token # assert "access_token" in bearer_token
assert isinstance(bearer_token["access_token"], str) # assert isinstance(bearer_token["access_token"], str)
assert "refresh_token" in bearer_token # assert "refresh_token" in bearer_token
assert isinstance(bearer_token["refresh_token"], str) # assert isinstance(bearer_token["refresh_token"], str)
assert "token_type" in bearer_token # assert "token_type" in bearer_token
assert bearer_token["token_type"] == "Bearer" # assert bearer_token["token_type"] == "Bearer"
assert "scope" in bearer_token # assert "scope" in bearer_token
assert isinstance(bearer_token["scope"], str) # assert isinstance(bearer_token["scope"], str)
#
def test_get_user_info_from_public_access_token(self, app: Flask) -> None: # def test_get_user_info_from_public_access_token(self, app: Flask) -> None:
"""Test_get_user_info_from_public_access_token.""" # """Test_get_user_info_from_public_access_token."""
for user_id in ("user_1", "user_2", "admin_1", "admin_2"): # for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
public_access_token = self.get_public_access_token(user_id, user_id) # public_access_token = self.get_public_access_token(user_id, user_id)
user_info = AuthorizationService().get_user_info_from_public_access_token( # user_info = AuthorizationService().get_user_info_from_id_token(
public_access_token # public_access_token
) # )
assert "sub" in user_info # assert "sub" in user_info
assert isinstance(user_info["sub"], str) # assert isinstance(user_info["sub"], str)
assert len(user_info["sub"]) == 36 # assert len(user_info["sub"]) == 36
assert "preferred_username" in user_info # assert "preferred_username" in user_info
assert user_info["preferred_username"] == user_id # assert user_info["preferred_username"] == user_id
assert "email" in user_info # assert "email" in user_info
assert user_info["email"] == f"{user_id}@example.com" # assert user_info["email"] == f"{user_id}@example.com"
#
def test_introspect_token(self, app: Flask) -> None: # def test_introspect_token(self, app: Flask) -> None:
"""Test_introspect_token.""" # """Test_introspect_token."""
( # (
keycloak_server_url, # keycloak_server_url,
keycloak_client_id, # keycloak_client_id,
keycloak_realm_name, # keycloak_realm_name,
keycloak_client_secret_key, # keycloak_client_secret_key,
) = self.get_keycloak_constants(app) # ) = self.get_keycloak_constants(app)
for user_id in ("user_1", "user_2", "admin_1", "admin_2"): # for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
basic_token = self.get_public_access_token(user_id, user_id) # basic_token = self.get_public_access_token(user_id, user_id)
introspection = AuthorizationService().introspect_token(basic_token) # introspection = AuthorizationService().introspect_token(basic_token)
assert isinstance(introspection, dict) # assert isinstance(introspection, dict)
assert introspection["typ"] == "Bearer" # assert introspection["typ"] == "Bearer"
assert introspection["preferred_username"] == user_id # assert introspection["preferred_username"] == user_id
assert introspection["client_id"] == "spiffworkflow-frontend" # assert introspection["client_id"] == "spiffworkflow-frontend"
#
assert "resource_access" in introspection # assert "resource_access" in introspection
resource_access = introspection["resource_access"] # resource_access = introspection["resource_access"]
assert isinstance(resource_access, dict) # assert isinstance(resource_access, dict)
#
assert keycloak_client_id in resource_access # assert keycloak_client_id in resource_access
client = resource_access[keycloak_client_id] # client = resource_access[keycloak_client_id]
assert "roles" in client # assert "roles" in client
roles = client["roles"] # roles = client["roles"]
#
assert isinstance(roles, list) # assert isinstance(roles, list)
if user_id == "admin_1": # if user_id == "admin_1":
assert len(roles) == 2 # assert len(roles) == 2
for role in roles: # for role in roles:
assert role in ("User", "Admin") # assert role in ("User", "Admin")
elif user_id == "admin_2": # elif user_id == "admin_2":
assert len(roles) == 1 # assert len(roles) == 1
assert roles[0] == "User" # assert roles[0] == "User"
elif user_id == "user_1" or user_id == "user_2": # elif user_id == "user_1" or user_id == "user_2":
assert len(roles) == 2 # assert len(roles) == 2
for role in roles: # for role in roles:
assert role in ("User", "Anonymous") # assert role in ("User", "Anonymous")
#
def test_get_permission_by_token(self, app: Flask) -> None: # def test_get_permission_by_token(self, app: Flask) -> None:
"""Test_get_permission_by_token.""" # """Test_get_permission_by_token."""
output = {} # output: dict = {}
for user_id in ("user_1", "user_2", "admin_1", "admin_2"): # for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
output[user_id] = {} # output[user_id] = {}
basic_token = self.get_public_access_token(user_id, user_id) # basic_token = self.get_public_access_token(user_id, user_id)
permissions = AuthorizationService().get_permission_by_basic_token( # permissions = AuthorizationService().get_permission_by_basic_token(
basic_token # basic_token
) # )
if isinstance(permissions, list): # if isinstance(permissions, list):
for permission in permissions: # for permission in permissions:
resource_name = permission["rsname"] # resource_name = permission["rsname"]
output[user_id][resource_name] = {} # output[user_id][resource_name] = {}
# assert resource_name in resource_names # # assert resource_name in resource_names
# if resource_name == 'Process Groups' or resource_name == 'Process Models': # # if resource_name == 'Process Groups' or resource_name == 'Process Models':
if "scopes" in permission: # if "scopes" in permission:
# assert 'scopes' in permission # scopes = permission["scopes"]
scopes = permission["scopes"] # output[user_id][resource_name]["scopes"] = scopes
output[user_id][resource_name]["scopes"] = scopes #
# assert isinstance(scopes, list) # # if user_id == 'admin_1':
# assert len(scopes) == 1 # # # assert len(permissions) == 3
# assert scopes[0] == 'read' # # for permission in permissions:
# else: # # resource_name = permission['rsname']
# # assert 'scopes' not in permission # # # assert resource_name in resource_names
# ... # # if resource_name == 'Process Groups' or resource_name == 'Process Models':
# # # assert len(permission['scopes']) == 4
# if user_id == 'admin_1': # # for item in permission['scopes']:
# # assert len(permissions) == 3 # # # assert item in ('instantiate', 'read', 'update', 'delete')
# for permission in permissions: # # ...
# resource_name = permission['rsname'] # # else:
# # assert resource_name in resource_names # # # assert resource_name == 'Default Resource'
# if resource_name == 'Process Groups' or resource_name == 'Process Models': # # # assert 'scopes' not in permission
# # assert len(permission['scopes']) == 4 # # ...
# for item in permission['scopes']: # #
# # assert item in ('instantiate', 'read', 'update', 'delete') # # if user_id == 'admin_2':
# ... # # # assert len(permissions) == 3
# else: # # for permission in permissions:
# # assert resource_name == 'Default Resource' # # resource_name = permission['rsname']
# # assert 'scopes' not in permission # # # assert resource_name in resource_names
# ... # # if resource_name == 'Process Groups' or resource_name == 'Process Models':
# # # # assert len(permission['scopes']) == 1
# if user_id == 'admin_2': # # # assert permission['scopes'][0] == 'read'
# # assert len(permissions) == 3 # # ...
# for permission in permissions: # # else:
# resource_name = permission['rsname'] # # # assert resource_name == 'Default Resource'
# # assert resource_name in resource_names # # # assert 'scopes' not in permission
# if resource_name == 'Process Groups' or resource_name == 'Process Models': # # ...
# # assert len(permission['scopes']) == 1 # # else:
# # assert permission['scopes'][0] == 'read' # # print(f"No Permissions: {permissions}")
# ... # print("test_get_permission_by_token")
# else: #
# # assert resource_name == 'Default Resource' # def test_get_auth_status_for_resource_and_scope_by_token(self, app: Flask) -> None:
# # assert 'scopes' not in permission # """Test_get_auth_status_for_resource_and_scope_by_token."""
# ... # resources = "Admin", "Process Groups", "Process Models"
else: # # scope = 'read'
print(f"No Permissions: {permissions}") # output: dict = {}
print("test_get_permission_by_token") # for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# output[user_id] = {}
def test_get_auth_status_for_resource_and_scope_by_token(self, app: Flask) -> None: # basic_token = self.get_public_access_token(user_id, user_id)
"""Test_get_auth_status_for_resource_and_scope_by_token.""" # for resource in resources:
resources = "Admin", "Process Groups", "Process Models" # output[user_id][resource] = {}
# scope = 'read' # for scope in "instantiate", "read", "update", "delete":
output = {} # auth_status = AuthorizationService().get_auth_status_for_resource_and_scope_by_token(
for user_id in ("user_1", "user_2", "admin_1", "admin_2"): # basic_token, resource, scope
output[user_id] = {} # )
basic_token = self.get_public_access_token(user_id, user_id) # output[user_id][resource][scope] = auth_status
for resource in resources: # print("test_get_auth_status_for_resource_and_scope_by_token")
output[user_id][resource] = {} #
for scope in "instantiate", "read", "update", "delete": # def test_get_permissions_by_token_for_resource_and_scope(self, app: Flask) -> None:
auth_status = AuthorizationService().get_auth_status_for_resource_and_scope_by_token( # """Test_get_permissions_by_token_for_resource_and_scope."""
basic_token, resource, scope # resource_names = "Default Resource", "Process Groups", "Process Models"
) # output: dict = {}
output[user_id][resource][scope] = auth_status # for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
print("test_get_auth_status_for_resource_and_scope_by_token") # output[user_id] = {}
# basic_token = self.get_public_access_token(user_id, user_id)
def test_get_permissions_by_token_for_resource_and_scope(self, app: Flask): # for resource in resource_names:
"""Test_get_permissions_by_token_for_resource_and_scope.""" # output[user_id][resource] = {}
resource_names = "Default Resource", "Process Groups", "Process Models" # for scope in "instantiate", "read", "update", "delete":
output = {} # permissions = AuthorizationService().get_permissions_by_token_for_resource_and_scope(
for user_id in ("user_1", "user_2", "admin_1", "admin_2"): # basic_token, resource, scope
output[user_id] = {} # )
basic_token = self.get_public_access_token(user_id, user_id) # output[user_id][resource][scope] = permissions
for resource in resource_names: # print("test_get_permissions_by_token_for_resource_and_scope")
output[user_id][resource] = {}
for scope in "instantiate", "read", "update", "delete":
permissions = AuthorizationService().get_permissions_by_token_for_resource_and_scope(
basic_token, resource, scope
)
output[user_id][resource][scope] = permissions
print("test_get_permissions_by_token_for_resource_and_scope")