call dequeue method for interstitial page tests w/ burnettk

This commit is contained in:
jasquat 2023-04-24 15:48:20 -04:00
parent c19e59757d
commit 41fead68f2
No known key found for this signature in database
3 changed files with 11 additions and 10 deletions

View File

@ -430,7 +430,8 @@ def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str, Op
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
with ProcessInstanceQueueService.dequeued(process_instance):
yield from _interstitial_stream(process_instance)
def interstitial(process_instance_id: int) -> Response:
"""A Server Side Events Stream for watching the execution of engine tasks."""
return Response(

View File

@ -9,7 +9,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend import db
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.tasks_controller import _interstitial_stream
from spiffworkflow_backend.routes.tasks_controller import _dequeued_interstitial_stream
class TestForGoodErrors(BaseTest):
@ -22,7 +22,7 @@ class TestForGoodErrors(BaseTest):
with_super_admin_user: UserModel,
) -> Any:
# Call this to assure all engine-steps are fully processed before we search for human tasks.
_interstitial_stream(process_instance_id)
_dequeued_interstitial_stream(process_instance_id)
"""Returns the next available user task for a given process instance, if possible."""
human_tasks = (

View File

@ -33,7 +33,7 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.tasks_controller import _interstitial_stream
from spiffworkflow_backend.routes.tasks_controller import _dequeued_interstitial_stream
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
@ -1630,7 +1630,7 @@ class TestProcessApi(BaseTest):
headers=self.logged_in_headers(with_super_admin_user),
)
# Call this to assure all engine-steps are fully processed.
_interstitial_stream(process_instance_id)
_dequeued_interstitial_stream(process_instance_id)
assert response.json is not None
assert response.json["next_task"] is not None
@ -1694,7 +1694,7 @@ class TestProcessApi(BaseTest):
# Rather that call the API and deal with the Server Side Events, call the loop directly and covert it to
# a list. It tests all of our code. No reason to test Flasks SSE support.
stream_results = _interstitial_stream(process_instance_id)
stream_results = _dequeued_interstitial_stream(process_instance_id)
results = list(stream_results)
# strip the "data:" prefix and convert remaining string to dict.
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
@ -1717,7 +1717,7 @@ class TestProcessApi(BaseTest):
assert response.json is not None
# we should now be on a task that does not belong to the original user, and the interstitial page should know this.
results = list(_interstitial_stream(process_instance_id))
results = list(_dequeued_interstitial_stream(process_instance_id))
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
assert len(results) == 1
assert json_results[0]["state"] == "READY"
@ -1732,9 +1732,9 @@ class TestProcessApi(BaseTest):
)
# We should now be on the end task with a valid message, even after loading it many times.
list(_interstitial_stream(process_instance_id))
list(_interstitial_stream(process_instance_id))
results = list(_interstitial_stream(process_instance_id))
list(_dequeued_interstitial_stream(process_instance_id))
list(_dequeued_interstitial_stream(process_instance_id))
results = list(_dequeued_interstitial_stream(process_instance_id))
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
assert len(json_results) == 1
assert json_results[0]["state"] == "COMPLETED"