mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-02-26 08:05:23 +00:00
call dequeue method for interstitial page tests w/ burnettk
This commit is contained in:
parent
17b01444a9
commit
013a2cb34a
@ -430,7 +430,8 @@ def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str, Op
|
|||||||
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
|
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
|
||||||
with ProcessInstanceQueueService.dequeued(process_instance):
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
yield from _interstitial_stream(process_instance)
|
yield from _interstitial_stream(process_instance)
|
||||||
|
|
||||||
|
|
||||||
def interstitial(process_instance_id: int) -> Response:
|
def interstitial(process_instance_id: int) -> Response:
|
||||||
"""A Server Side Events Stream for watching the execution of engine tasks."""
|
"""A Server Side Events Stream for watching the execution of engine tasks."""
|
||||||
return Response(
|
return Response(
|
||||||
|
@ -9,7 +9,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
|||||||
from spiffworkflow_backend import db
|
from spiffworkflow_backend import db
|
||||||
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.routes.tasks_controller import _interstitial_stream
|
from spiffworkflow_backend.routes.tasks_controller import _dequeued_interstitial_stream
|
||||||
|
|
||||||
|
|
||||||
class TestForGoodErrors(BaseTest):
|
class TestForGoodErrors(BaseTest):
|
||||||
@ -22,7 +22,7 @@ class TestForGoodErrors(BaseTest):
|
|||||||
with_super_admin_user: UserModel,
|
with_super_admin_user: UserModel,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
# Call this to assure all engine-steps are fully processed before we search for human tasks.
|
# Call this to assure all engine-steps are fully processed before we search for human tasks.
|
||||||
_interstitial_stream(process_instance_id)
|
_dequeued_interstitial_stream(process_instance_id)
|
||||||
|
|
||||||
"""Returns the next available user task for a given process instance, if possible."""
|
"""Returns the next available user task for a given process instance, if possible."""
|
||||||
human_tasks = (
|
human_tasks = (
|
||||||
|
@ -33,7 +33,7 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
|
|||||||
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
|
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
|
||||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.routes.tasks_controller import _interstitial_stream
|
from spiffworkflow_backend.routes.tasks_controller import _dequeued_interstitial_stream
|
||||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||||
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
|
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
|
||||||
@ -1630,7 +1630,7 @@ class TestProcessApi(BaseTest):
|
|||||||
headers=self.logged_in_headers(with_super_admin_user),
|
headers=self.logged_in_headers(with_super_admin_user),
|
||||||
)
|
)
|
||||||
# Call this to assure all engine-steps are fully processed.
|
# Call this to assure all engine-steps are fully processed.
|
||||||
_interstitial_stream(process_instance_id)
|
_dequeued_interstitial_stream(process_instance_id)
|
||||||
assert response.json is not None
|
assert response.json is not None
|
||||||
assert response.json["next_task"] is not None
|
assert response.json["next_task"] is not None
|
||||||
|
|
||||||
@ -1694,7 +1694,7 @@ class TestProcessApi(BaseTest):
|
|||||||
|
|
||||||
# Rather that call the API and deal with the Server Side Events, call the loop directly and covert it to
|
# Rather that call the API and deal with the Server Side Events, call the loop directly and covert it to
|
||||||
# a list. It tests all of our code. No reason to test Flasks SSE support.
|
# a list. It tests all of our code. No reason to test Flasks SSE support.
|
||||||
stream_results = _interstitial_stream(process_instance_id)
|
stream_results = _dequeued_interstitial_stream(process_instance_id)
|
||||||
results = list(stream_results)
|
results = list(stream_results)
|
||||||
# strip the "data:" prefix and convert remaining string to dict.
|
# strip the "data:" prefix and convert remaining string to dict.
|
||||||
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
|
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
|
||||||
@ -1717,7 +1717,7 @@ class TestProcessApi(BaseTest):
|
|||||||
assert response.json is not None
|
assert response.json is not None
|
||||||
|
|
||||||
# we should now be on a task that does not belong to the original user, and the interstitial page should know this.
|
# we should now be on a task that does not belong to the original user, and the interstitial page should know this.
|
||||||
results = list(_interstitial_stream(process_instance_id))
|
results = list(_dequeued_interstitial_stream(process_instance_id))
|
||||||
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
|
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
|
||||||
assert len(results) == 1
|
assert len(results) == 1
|
||||||
assert json_results[0]["state"] == "READY"
|
assert json_results[0]["state"] == "READY"
|
||||||
@ -1732,9 +1732,9 @@ class TestProcessApi(BaseTest):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# We should now be on the end task with a valid message, even after loading it many times.
|
# We should now be on the end task with a valid message, even after loading it many times.
|
||||||
list(_interstitial_stream(process_instance_id))
|
list(_dequeued_interstitial_stream(process_instance_id))
|
||||||
list(_interstitial_stream(process_instance_id))
|
list(_dequeued_interstitial_stream(process_instance_id))
|
||||||
results = list(_interstitial_stream(process_instance_id))
|
results = list(_dequeued_interstitial_stream(process_instance_id))
|
||||||
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
|
json_results = list(map(lambda x: json.loads(x[5:]), results)) # type: ignore
|
||||||
assert len(json_results) == 1
|
assert len(json_results) == 1
|
||||||
assert json_results[0]["state"] == "COMPLETED"
|
assert json_results[0]["state"] == "COMPLETED"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user