WIP - some tests are now passing and some are failing w/ burnettk
This commit is contained in:
parent
c5b85fd404
commit
92b021e708
File diff suppressed because it is too large
Load Diff
|
@ -159,6 +159,7 @@ class MessageService:
|
||||||
) -> None:
|
) -> None:
|
||||||
"""process_message_receive."""
|
"""process_message_receive."""
|
||||||
processor_receive = ProcessInstanceProcessor(process_instance_receive)
|
processor_receive = ProcessInstanceProcessor(process_instance_receive)
|
||||||
|
# import pdb; pdb.set_trace()
|
||||||
processor_receive.bpmn_process_instance.catch_bpmn_message(message_model_name, message_payload)
|
processor_receive.bpmn_process_instance.catch_bpmn_message(message_model_name, message_payload)
|
||||||
processor_receive.do_engine_steps(save=True)
|
processor_receive.do_engine_steps(save=True)
|
||||||
message_instance_receive.status = MessageStatuses.completed.value
|
message_instance_receive.status = MessageStatuses.completed.value
|
||||||
|
|
|
@ -1650,7 +1650,7 @@ class ProcessInstanceProcessor:
|
||||||
and self._script_engine.failing_spiff_task is not None
|
and self._script_engine.failing_spiff_task is not None
|
||||||
):
|
):
|
||||||
self._script_engine.failing_spiff_task = None
|
self._script_engine.failing_spiff_task = None
|
||||||
with open("do_engine_steps.json", 'w') as f: f.write(json.dumps(self.serialize(), indent=2))
|
# with open("do_engine_steps.json", 'w') as f: f.write(json.dumps(self.serialize(), indent=2))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
|
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
|
||||||
|
|
|
@ -26,6 +26,11 @@ from spiffworkflow_backend.models.process_instance_event import ProcessInstanceE
|
||||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||||
|
|
||||||
|
|
||||||
|
class StartAndEndTimes(TypedDict):
|
||||||
|
start_in_seconds: Optional[float]
|
||||||
|
end_in_seconds: Optional[float]
|
||||||
|
|
||||||
|
|
||||||
class JsonDataDict(TypedDict):
|
class JsonDataDict(TypedDict):
|
||||||
hash: str
|
hash: str
|
||||||
data: dict
|
data: dict
|
||||||
|
@ -108,30 +113,46 @@ class TaskService:
|
||||||
self,
|
self,
|
||||||
spiff_task: SpiffTask,
|
spiff_task: SpiffTask,
|
||||||
task_failed: bool = False,
|
task_failed: bool = False,
|
||||||
|
start_and_end_times: Optional[StartAndEndTimes] = None,
|
||||||
) -> TaskModel:
|
) -> TaskModel:
|
||||||
(
|
new_bpmn_process = None
|
||||||
new_bpmn_process,
|
if str(spiff_task.id) in self.task_models:
|
||||||
task_model,
|
task_model = self.task_models[str(spiff_task.id)]
|
||||||
new_task_models,
|
else:
|
||||||
new_json_data_dicts,
|
(
|
||||||
) = self.__class__.find_or_create_task_model_from_spiff_task(
|
new_bpmn_process,
|
||||||
spiff_task,
|
task_model,
|
||||||
self.process_instance,
|
new_task_models,
|
||||||
self.serializer,
|
new_json_data_dicts,
|
||||||
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
|
) = self.__class__.find_or_create_task_model_from_spiff_task(
|
||||||
)
|
spiff_task,
|
||||||
bpmn_process = new_bpmn_process or task_model.bpmn_process
|
self.process_instance,
|
||||||
bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(
|
self.serializer,
|
||||||
bpmn_process, spiff_task.workflow.data
|
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
|
||||||
)
|
)
|
||||||
self.task_models.update(new_task_models)
|
self.task_models.update(new_task_models)
|
||||||
self.json_data_dicts.update(new_json_data_dicts)
|
self.json_data_dicts.update(new_json_data_dicts)
|
||||||
|
|
||||||
|
# we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value
|
||||||
|
bpmn_process = new_bpmn_process or task_model.bpmn_process or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first()
|
||||||
|
|
||||||
|
try:
|
||||||
|
bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(
|
||||||
|
bpmn_process, spiff_task.workflow.data
|
||||||
|
)
|
||||||
|
except Exception as ex:
|
||||||
|
import pdb; pdb.set_trace()
|
||||||
|
print("HEY90823")
|
||||||
json_data_dict_list = self.__class__.update_task_model(task_model, spiff_task, self.serializer)
|
json_data_dict_list = self.__class__.update_task_model(task_model, spiff_task, self.serializer)
|
||||||
self.task_models[task_model.guid] = task_model
|
self.task_models[task_model.guid] = task_model
|
||||||
if bpmn_process_json_data is not None:
|
if bpmn_process_json_data is not None:
|
||||||
json_data_dict_list.append(bpmn_process_json_data)
|
json_data_dict_list.append(bpmn_process_json_data)
|
||||||
self.update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts)
|
self.update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts)
|
||||||
|
|
||||||
|
if start_and_end_times:
|
||||||
|
task_model.start_in_seconds = start_and_end_times['start_in_seconds']
|
||||||
|
task_model.end_in_seconds = start_and_end_times['end_in_seconds']
|
||||||
|
|
||||||
if task_model.state == "COMPLETED" or task_failed:
|
if task_model.state == "COMPLETED" or task_failed:
|
||||||
event_type = ProcessInstanceEventType.task_completed.value
|
event_type = ProcessInstanceEventType.task_completed.value
|
||||||
if task_failed:
|
if task_failed:
|
||||||
|
|
|
@ -24,7 +24,7 @@ from spiffworkflow_backend.services.assertion_service import safe_assertion
|
||||||
from spiffworkflow_backend.services.process_instance_lock_service import (
|
from spiffworkflow_backend.services.process_instance_lock_service import (
|
||||||
ProcessInstanceLockService,
|
ProcessInstanceLockService,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.services.task_service import TaskService
|
from spiffworkflow_backend.services.task_service import StartAndEndTimes, TaskService
|
||||||
|
|
||||||
|
|
||||||
class EngineStepDelegate:
|
class EngineStepDelegate:
|
||||||
|
@ -62,10 +62,11 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||||
self.serializer = serializer
|
self.serializer = serializer
|
||||||
|
|
||||||
self.current_task_model: Optional[TaskModel] = None
|
self.current_task_model: Optional[TaskModel] = None
|
||||||
self.current_task_start_in_seconds: Optional[float] = None
|
# self.current_task_start_in_seconds: Optional[float] = None
|
||||||
|
|
||||||
self.last_completed_spiff_task: Optional[SpiffTask] = None
|
self.last_completed_spiff_task: Optional[SpiffTask] = None
|
||||||
self.spiff_tasks_to_process: Set[UUID] = set()
|
self.spiff_tasks_to_process: Set[UUID] = set()
|
||||||
|
self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {}
|
||||||
|
|
||||||
self.task_service = TaskService(
|
self.task_service = TaskService(
|
||||||
process_instance=self.process_instance,
|
process_instance=self.process_instance,
|
||||||
|
@ -75,10 +76,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||||
|
|
||||||
def will_complete_task(self, spiff_task: SpiffTask) -> None:
|
def will_complete_task(self, spiff_task: SpiffTask) -> None:
|
||||||
if self._should_update_task_model():
|
if self._should_update_task_model():
|
||||||
# if spiff_task.task_spec.name == 'passing_script_task':
|
self.spiff_task_timestamps[spiff_task.id] = {'start_in_seconds': time.time(), 'end_in_seconds': None}
|
||||||
# import pdb; pdb.set_trace()
|
|
||||||
# print("HEY1")
|
|
||||||
self.current_task_start_in_seconds = time.time()
|
|
||||||
spiff_task.task_spec._predict(spiff_task, mask=TaskState.NOT_FINISHED_MASK)
|
spiff_task.task_spec._predict(spiff_task, mask=TaskState.NOT_FINISHED_MASK)
|
||||||
if self.secondary_engine_step_delegate:
|
if self.secondary_engine_step_delegate:
|
||||||
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
|
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
|
||||||
|
@ -88,15 +86,16 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||||
# if spiff_task.task_spec.name == 'test_process_to_call_script.BoundaryEventParent':
|
# if spiff_task.task_spec.name == 'test_process_to_call_script.BoundaryEventParent':
|
||||||
# import pdb; pdb.set_trace()
|
# import pdb; pdb.set_trace()
|
||||||
# print("HEY")
|
# print("HEY")
|
||||||
task_model = self.task_service.update_task_model_with_spiff_task(spiff_task)
|
# task_model = self.task_service.update_task_model_with_spiff_task(spiff_task)
|
||||||
if self.current_task_start_in_seconds is None:
|
# if self.current_task_start_in_seconds is None:
|
||||||
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
|
# raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
|
||||||
task_model.start_in_seconds = self.current_task_start_in_seconds
|
# task_model.start_in_seconds = self.current_task_start_in_seconds
|
||||||
task_model.end_in_seconds = time.time()
|
# task_model.end_in_seconds = time.time()
|
||||||
|
self.spiff_task_timestamps[spiff_task.id]['end_in_seconds'] = time.time()
|
||||||
self.last_completed_spiff_task = spiff_task
|
self.last_completed_spiff_task = spiff_task
|
||||||
self.spiff_tasks_to_process.add(spiff_task.id)
|
self.spiff_tasks_to_process.add(spiff_task.id)
|
||||||
self._add_children(spiff_task)
|
self._add_children(spiff_task)
|
||||||
self._add_parents(spiff_task)
|
# self._add_parents(spiff_task)
|
||||||
|
|
||||||
# self.task_service.process_spiff_task_parent_subprocess_tasks(spiff_task)
|
# self.task_service.process_spiff_task_parent_subprocess_tasks(spiff_task)
|
||||||
# self.task_service.process_spiff_task_children(spiff_task)
|
# self.task_service.process_spiff_task_children(spiff_task)
|
||||||
|
@ -134,12 +133,13 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||||
# for waiting_spiff_task in bpmn_process_instance.get_tasks(
|
# for waiting_spiff_task in bpmn_process_instance.get_tasks(
|
||||||
# TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY | TaskState.FUTURE
|
# TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY | TaskState.FUTURE
|
||||||
# ):
|
# ):
|
||||||
for spiff_task_guid in self.spiff_tasks_to_process:
|
# self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
|
||||||
if spiff_task_guid is None:
|
for spiff_task_uuid in self.spiff_tasks_to_process:
|
||||||
|
if spiff_task_uuid is None: # or str(spiff_task_uuid) in self.task_service.task_models:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
print(f"spiff_task_guid: {spiff_task_guid}")
|
# print(f"spiff_task_uuid: {spiff_task_uuid}")
|
||||||
waiting_spiff_task = bpmn_process_instance.get_task_from_id(spiff_task_guid)
|
waiting_spiff_task = bpmn_process_instance.get_task_from_id(spiff_task_uuid)
|
||||||
except TaskNotFoundException:
|
except TaskNotFoundException:
|
||||||
continue
|
continue
|
||||||
# if waiting_spiff_task.task_spec.name == 'top_level_manual_task_two':
|
# if waiting_spiff_task.task_spec.name == 'top_level_manual_task_two':
|
||||||
|
@ -152,15 +152,14 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||||
if cpt.id == waiting_spiff_task.id:
|
if cpt.id == waiting_spiff_task.id:
|
||||||
waiting_spiff_task.parent.children.remove(cpt)
|
waiting_spiff_task.parent.children.remove(cpt)
|
||||||
continue
|
continue
|
||||||
try:
|
start_and_end_times = None
|
||||||
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
|
if waiting_spiff_task.id in self.spiff_task_timestamps:
|
||||||
except Exception as ex:
|
start_and_end_times = self.spiff_task_timestamps[waiting_spiff_task.id]
|
||||||
import pdb; pdb.set_trace()
|
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task, start_and_end_times=start_and_end_times)
|
||||||
print("HEY16")
|
|
||||||
# self.task_service.process_spiff_task_parent_subprocess_tasks(waiting_spiff_task)
|
# self.task_service.process_spiff_task_parent_subprocess_tasks(waiting_spiff_task)
|
||||||
|
|
||||||
# if self.last_completed_spiff_task is not None:
|
if self.last_completed_spiff_task is not None:
|
||||||
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
|
self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
|
||||||
# self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
|
# self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
|
||||||
|
|
||||||
def _should_update_task_model(self) -> bool:
|
def _should_update_task_model(self) -> bool:
|
||||||
|
@ -286,7 +285,6 @@ class WorkflowExecutionService:
|
||||||
self.process_bpmn_messages()
|
self.process_bpmn_messages()
|
||||||
self.queue_waiting_receive_messages()
|
self.queue_waiting_receive_messages()
|
||||||
except SpiffWorkflowException as swe:
|
except SpiffWorkflowException as swe:
|
||||||
raise swe
|
|
||||||
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
|
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -59,6 +59,7 @@ class TestLoggingService(BaseTest):
|
||||||
assert log_response.status_code == 200
|
assert log_response.status_code == 200
|
||||||
assert log_response.json
|
assert log_response.json
|
||||||
logs: list = log_response.json["results"]
|
logs: list = log_response.json["results"]
|
||||||
|
import pdb; pdb.set_trace()
|
||||||
assert len(logs) == 4
|
assert len(logs) == 4
|
||||||
|
|
||||||
for log in logs:
|
for log in logs:
|
||||||
|
|
Loading…
Reference in New Issue