added some more typed dicts w/ burnettk
This commit is contained in:
parent
54e9685d5e
commit
9ccf99eb4f
|
@ -351,4 +351,4 @@ zstd = ["zstandard (>=0.18.0)"]
|
|||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "731eb1f75f491b8d8588d03b91d010b5a11f344d45c933aeeee1d478d3b7d7d5"
|
||||
content-hash = "06fcba658e40f009132662bd727c73d2dba09b6201f7149fe1384f770e0ce661"
|
||||
|
|
|
@ -9,6 +9,7 @@ packages = [{include = "spiffworkflow_connector_command", from = "src" }]
|
|||
[tool.poetry.dependencies]
|
||||
python = "^3.9"
|
||||
requests = "^2.28.2"
|
||||
typing-extensions = "^4.8.0"
|
||||
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
|
|
|
@ -1,26 +1,52 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import sys
|
||||
from typing import Any
|
||||
from typing import TypedDict
|
||||
|
||||
if sys.version_info < (3, 11):
|
||||
from typing_extensions import NotRequired
|
||||
from typing_extensions import TypedDict
|
||||
else:
|
||||
from typing import NotRequired
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
class CommandErrorDict(TypedDict):
|
||||
error_name: str
|
||||
message: str
|
||||
|
||||
|
||||
class CommandResponseDict(TypedDict):
|
||||
response: dict
|
||||
"""This is passed back to spiffworkflow-backend as the response body."""
|
||||
|
||||
# this is given to the service task as task data
|
||||
command_response: dict
|
||||
error: CommandErrorDict | None
|
||||
|
||||
# these are printed to spiffworkflow-backend logs
|
||||
spiff__logs: NotRequired[list[str]| None]
|
||||
|
||||
# added by spiffworkflow-proxy if not set
|
||||
# example: http/GetRequestV2
|
||||
operator_id: NotRequired[str]
|
||||
|
||||
|
||||
class CommandResultDict(TypedDict):
|
||||
"""spiffworkflow-proxy parses this result to determine what happended."""
|
||||
response: CommandResponseDict
|
||||
status: int
|
||||
mimetype: str
|
||||
|
||||
|
||||
class CommandInterface(metaclass=abc.ABCMeta):
|
||||
class ConnectorCommand(metaclass=abc.ABCMeta):
|
||||
"""Abstract class to describe how to make a command."""
|
||||
|
||||
@classmethod
|
||||
def __subclasshook__(cls, subclass: Any) -> bool:
|
||||
return (
|
||||
hasattr(subclass, "execute")
|
||||
and callable(subclass.run)
|
||||
and NotImplemented
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute(self, config: Any, task_data: dict) -> CommandResponseDict:
|
||||
def execute(self, config: Any, task_data: dict) -> CommandResultDict:
|
||||
raise NotImplementedError("method must be implemented on subclass: execute")
|
||||
|
||||
# this is not a required method but if it gets used then it must be overridden
|
||||
def app_description(self, *args: Any, **kwargs: Any) -> dict:
|
||||
"""Return a dict to describe the connector. This is used only for authentication commands at the moment."""
|
||||
raise NotImplementedError("method must be implemented on subclass: app_description")
|
||||
|
|
Loading…
Reference in New Issue