remove the unused next_task key from api calls since nobody uses it w/ burnettk essweine (#688)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-11-17 16:34:04 -05:00 committed by GitHub
parent 5e95849923
commit b031b0f940
5 changed files with 22 additions and 71 deletions

View File

@ -5,7 +5,6 @@ from typing import Any
import marshmallow
from marshmallow import INCLUDE
from marshmallow import Schema
from marshmallow_enum import EnumField # type: ignore
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
@ -15,8 +14,6 @@ from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskSchema
from spiffworkflow_backend.models.user import UserModel
@ -189,15 +186,13 @@ class ProcessInstanceApi:
def __init__(
self,
id: int,
status: ProcessInstanceStatus,
next_task: Task | None,
status: str,
process_model_identifier: str,
process_model_display_name: str,
updated_at_in_seconds: int,
) -> None:
self.id = id
self.status = status
self.next_task = next_task # The next task that requires user input.
self.process_model_identifier = process_model_identifier
self.process_model_display_name = process_model_display_name
self.updated_at_in_seconds = updated_at_in_seconds
@ -209,26 +204,20 @@ class ProcessInstanceApiSchema(Schema):
fields = [
"id",
"status",
"next_task",
"process_model_identifier",
"process_model_display_name",
"updated_at_in_seconds",
]
unknown = INCLUDE
status = EnumField(ProcessInstanceStatus)
next_task = marshmallow.fields.Nested(TaskSchema, dump_only=True, required=False)
@marshmallow.post_load
def make_process_instance(self, data: dict[str, Any], **kwargs: dict) -> ProcessInstanceApi:
keys = [
"id",
"status",
"next_task",
"process_model_identifier",
"process_model_display_name",
"updated_at_in_seconds",
]
filtered_fields = {key: data[key] for key in keys}
filtered_fields["next_task"] = TaskSchema().make_task(data["next_task"])
return ProcessInstanceApi(**filtered_fields)

View File

@ -89,7 +89,7 @@ def process_instance_create(
def _process_instance_run(
process_instance: ProcessInstanceModel,
) -> ProcessInstanceProcessor | None:
) -> None:
if process_instance.status != "not_started":
raise ApiError(
error_code="process_instance_not_runnable",
@ -125,34 +125,18 @@ def _process_instance_run(
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP"]:
MessageService.correlate_all_message_instances()
return processor
def process_instance_run(
modified_process_model_identifier: str,
process_instance_id: int,
) -> flask.wrappers.Response:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
processor = _process_instance_run(process_instance)
_process_instance_run(process_instance)
# for mypy
if processor is not None:
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
process_instance_data = processor.get_data()
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(process_instance)
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
process_instance_metadata["data"] = process_instance_data
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
return make_response(jsonify(process_instance), 200)
def _process_instance_start(
process_model_identifier: str,
) -> tuple[ProcessInstanceModel, ProcessInstanceProcessor | None]:
process_instance = _process_instance_create(process_model_identifier)
processor = _process_instance_run(process_instance)
return process_instance, processor
def process_instance_terminate(
process_instance_id: int,

View File

@ -289,33 +289,14 @@ class ProcessInstanceService:
return processor
@staticmethod
def processor_to_process_instance_api(
processor: ProcessInstanceProcessor, next_task: None = None
) -> ProcessInstanceApi:
"""Returns an API model representing the state of the current process_instance.
If requested, and possible, next_task is set to the current_task.
"""
# navigation = processor.bpmn_process_instance.get_deep_nav_list()
# ProcessInstanceService.update_navigation(navigation, processor)
ProcessModelService.get_process_model(processor.process_model_identifier)
def processor_to_process_instance_api(process_instance: ProcessInstanceModel) -> ProcessInstanceApi:
"""Returns an API model representing the state of the current process_instance."""
process_instance_api = ProcessInstanceApi(
id=processor.get_process_instance_id(),
status=processor.get_status(),
next_task=None,
process_model_identifier=processor.process_model_identifier,
process_model_display_name=processor.process_model_display_name,
updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds,
)
next_task_trying_again = next_task
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
next_task_trying_again = processor.next_task()
if next_task_trying_again is not None:
process_instance_api.next_task = ProcessInstanceService.spiff_task_to_api_task(
processor, next_task_trying_again, add_docs_and_forms=True
id=process_instance.id,
status=process_instance.status,
process_model_identifier=process_instance.process_model_identifier,
process_model_display_name=process_instance.process_model_display_name,
updated_at_in_seconds=process_instance.updated_at_in_seconds,
)
return process_instance_api

View File

@ -1318,8 +1318,6 @@ class TestProcessApi(BaseTest):
assert response.json["updated_at_in_seconds"] > 0
assert response.json["status"] == "complete"
assert response.json["process_model_identifier"] == process_model.id
assert response.json["data"]["Mike"] == "Awesome"
assert response.json["data"]["person"] == "Kevin"
def test_process_instance_show(
self,

View File

@ -46,10 +46,9 @@ class TestTasksController(BaseTest):
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model.id)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
# Call this to assure all engine-steps are fully processed.
_dequeued_interstitial_stream(process_instance_id)
assert response.json is not None
assert response.json["next_task"] is not None
human_tasks = (
db.session.query(HumanTaskModel).filter(HumanTaskModel.process_instance_id == process_instance_id).all()
@ -214,11 +213,11 @@ class TestTasksController(BaseTest):
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model.id)}/{process_instance_id}/run",
headers=headers,
)
assert response.status_code == 200
assert response.json is not None
assert response.json["next_task"] is not None
assert response.json["next_task"]["state"] == "READY"
assert response.json["next_task"]["title"] == "Script Task #2"
task_model = TaskModel.query.filter_by(process_instance_id=process_instance_id, state="READY").first()
assert task_model is not None
assert task_model.task_definition.bpmn_name == "Script Task #2"
# Rather that call the API and deal with the Server Side Events, call the loop directly and covert it to
# a list. It tests all of our code. No reason to test Flasks SSE support.
@ -481,10 +480,10 @@ class TestTasksController(BaseTest):
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
assert "next_task" in response.json
task_guid = response.json["next_task"]["id"]
assert task_guid is not None
task_model = TaskModel.query.filter_by(process_instance_id=process_instance_id, state="READY").first()
assert task_model is not None
task_guid = task_model.guid
# log in a guest user to complete the tasks
redirect_url = "/test-redirect-dne"