add mccabe linter and auto fixes (#302)

Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
Kevin Burnett 2023-06-08 21:26:33 +00:00 committed by GitHub
parent 4fc6b1a193
commit 35543cc362
10 changed files with 15 additions and 18 deletions

View File

@ -168,6 +168,7 @@ explicit_package_bases = false
[tool.ruff] [tool.ruff]
select = [ select = [
"B", # flake8-bugbear "B", # flake8-bugbear
"C", # mccabe
"E", # pycodestyle error "E", # pycodestyle error
# "ERA", # eradicate # "ERA", # eradicate
"F", # pyflakes "F", # pyflakes
@ -180,6 +181,7 @@ select = [
] ]
ignore = [ ignore = [
"C901", # "complexity" category
"PLR", # "refactoring" category has "too many lines in method" type stuff "PLR", # "refactoring" category has "too many lines in method" type stuff
"PLC1901", "PLC1901",
"PLE1205" # saw this Too many arguments for `logging` format string give a false positive once "PLE1205" # saw this Too many arguments for `logging` format string give a false positive once

View File

@ -440,7 +440,7 @@ def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[st
def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow) -> int: def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow) -> int:
return len(list([t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual])) return len([t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual])
def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str | None, str | None, None]: def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str | None, str | None, None]:

View File

@ -36,4 +36,4 @@ class ProcessCallerService:
.filter(ProcessCallerCacheModel.process_identifier == process_id) .filter(ProcessCallerCacheModel.process_identifier == process_id)
.all() .all()
) )
return list(set(map(lambda r: r.calling_process_identifier, records))) # type: ignore return list({r.calling_process_identifier for r in records})

View File

@ -134,7 +134,7 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
return {} return {}
def last_result(self) -> dict[str, Any]: def last_result(self) -> dict[str, Any]:
return {k: v for k, v in self._last_result.items()} return dict(self._last_result.items())
def clear_state(self) -> None: def clear_state(self) -> None:
pass pass
@ -212,7 +212,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
return {k: v for k, v in self.state.items() if k not in keys_to_filter and not callable(v)} return {k: v for k, v in self.state.items() if k not in keys_to_filter and not callable(v)}
def last_result(self) -> dict[str, Any]: def last_result(self) -> dict[str, Any]:
return {k: v for k, v in self.state.items()} return dict(self.state.items())
def clear_state(self) -> None: def clear_state(self) -> None:
self.state = {} self.state = {}

View File

@ -126,7 +126,7 @@ class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelega
spiff_task.run() spiff_task.run()
def get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> SpiffTask | None: def get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> SpiffTask | None:
ready_tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY)]) ready_tasks = list(bpmn_process_instance.get_tasks(TaskState.READY))
if len(ready_tasks) > 0: if len(ready_tasks) > 0:
return ready_tasks[0] return ready_tasks[0]
return None return None

View File

@ -455,7 +455,7 @@ class TaskService:
) -> None: ) -> None:
"""Update given spiff tasks in the database and remove deleted tasks.""" """Update given spiff tasks in the database and remove deleted tasks."""
# Remove all the deleted/pruned tasks from the database. # Remove all the deleted/pruned tasks from the database.
deleted_task_guids = list(map(lambda t: str(t.id), deleted_spiff_tasks)) deleted_task_guids = [str(t.id) for t in deleted_spiff_tasks]
tasks_to_clear = TaskModel.query.filter(TaskModel.guid.in_(deleted_task_guids)).all() # type: ignore tasks_to_clear = TaskModel.query.filter(TaskModel.guid.in_(deleted_task_guids)).all() # type: ignore
human_tasks_to_clear = HumanTaskModel.query.filter( human_tasks_to_clear = HumanTaskModel.query.filter(
HumanTaskModel.task_id.in_(deleted_task_guids) # type: ignore HumanTaskModel.task_id.in_(deleted_task_guids) # type: ignore

View File

@ -91,7 +91,7 @@ class ExecutionStrategy:
self.delegate.save(bpmn_process_instance) self.delegate.save(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual]) tasks = [t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual]
if len(tasks) > 0: if len(tasks) > 0:
self.subprocess_spec_loader() self.subprocess_spec_loader()

View File

@ -15,11 +15,6 @@ class WorkflowService:
@classmethod @classmethod
def next_start_event_configuration(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> StartConfiguration | None: def next_start_event_configuration(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> StartConfiguration | None:
start_events = cls.future_start_events(workflow) start_events = cls.future_start_events(workflow)
configurations = list( configurations = [start_event.task_spec.configuration(start_event, now_in_utc) for start_event in start_events]
map(
lambda start_event: start_event.task_spec.configuration(start_event, now_in_utc), # type: ignore
start_events,
)
)
configurations.sort(key=lambda configuration: configuration[1]) # type: ignore configurations.sort(key=lambda configuration: configuration[1]) # type: ignore
return configurations[0] if len(configurations) > 0 else None return configurations[0] if len(configurations) > 0 else None

View File

@ -51,7 +51,7 @@ class BaseTest:
@staticmethod @staticmethod
def logged_in_headers(user: UserModel, _redirect_url: str = "http://some/frontend/url") -> dict[str, str]: def logged_in_headers(user: UserModel, _redirect_url: str = "http://some/frontend/url") -> dict[str, str]:
return dict(Authorization="Bearer " + user.encode_auth_token()) return {"Authorization": "Bearer " + user.encode_auth_token()}
def create_group_and_model_with_bpmn( def create_group_and_model_with_bpmn(
self, self,

View File

@ -1725,7 +1725,7 @@ class TestProcessApi(BaseTest):
stream_results = _dequeued_interstitial_stream(process_instance_id) stream_results = _dequeued_interstitial_stream(process_instance_id)
results = list(stream_results) results = list(stream_results)
# strip the "data:" prefix and convert remaining string to dict. # strip the "data:" prefix and convert remaining string to dict.
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore json_results = [json.loads(x[5:]) for x in results] # type: ignore
# There should be 2 results back - # There should be 2 results back -
# the first script task should not be returned (it contains no end user instructions) # the first script task should not be returned (it contains no end user instructions)
# The second script task should produce rendered jinja text # The second script task should produce rendered jinja text
@ -1746,7 +1746,7 @@ class TestProcessApi(BaseTest):
# we should now be on a task that does not belong to the original user, and the interstitial page should know this. # we should now be on a task that does not belong to the original user, and the interstitial page should know this.
results = list(_dequeued_interstitial_stream(process_instance_id)) results = list(_dequeued_interstitial_stream(process_instance_id))
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore json_results = [json.loads(x[5:]) for x in results] # type: ignore
assert len(results) == 1 assert len(results) == 1
assert json_results[0]["task"]["state"] == "READY" assert json_results[0]["task"]["state"] == "READY"
assert json_results[0]["task"]["can_complete"] is False assert json_results[0]["task"]["can_complete"] is False
@ -1760,7 +1760,7 @@ class TestProcessApi(BaseTest):
processor.save() processor.save()
results = list(_dequeued_interstitial_stream(process_instance_id)) results = list(_dequeued_interstitial_stream(process_instance_id))
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore json_results = [json.loads(x[5:]) for x in results] # type: ignore
assert len(results) == 1 assert len(results) == 1
assert json_results[0]["task"]["state"] == "READY" assert json_results[0]["task"]["state"] == "READY"
assert json_results[0]["task"]["can_complete"] is False assert json_results[0]["task"]["can_complete"] is False
@ -1777,7 +1777,7 @@ class TestProcessApi(BaseTest):
list(_dequeued_interstitial_stream(process_instance_id)) list(_dequeued_interstitial_stream(process_instance_id))
list(_dequeued_interstitial_stream(process_instance_id)) list(_dequeued_interstitial_stream(process_instance_id))
results = list(_dequeued_interstitial_stream(process_instance_id)) results = list(_dequeued_interstitial_stream(process_instance_id))
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore json_results = [json.loads(x[5:]) for x in results] # type: ignore
assert len(json_results) == 1 assert len(json_results) == 1
assert json_results[0]["task"]["state"] == "COMPLETED" assert json_results[0]["task"]["state"] == "COMPLETED"
assert json_results[0]["task"]["properties"]["instructionsForEndUser"] == "I am the end task" assert json_results[0]["task"]["properties"]["instructionsForEndUser"] == "I am the end task"