diff --git a/migrations/versions/6ea93d3b0d76_.py b/migrations/versions/e093f2840fcd_.py similarity index 98% rename from migrations/versions/6ea93d3b0d76_.py rename to migrations/versions/e093f2840fcd_.py index 0263d642..48670f58 100644 --- a/migrations/versions/6ea93d3b0d76_.py +++ b/migrations/versions/e093f2840fcd_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 6ea93d3b0d76 +Revision ID: e093f2840fcd Revises: -Create Date: 2022-08-10 13:12:22.289229 +Create Date: 2022-08-11 15:42:44.848283 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '6ea93d3b0d76' +revision = 'e093f2840fcd' down_revision = None branch_labels = None depends_on = None @@ -149,7 +149,7 @@ def upgrade(): sa.ForeignKeyConstraint(['user_uid'], ['user.uid'], ), sa.PrimaryKeyConstraint('id') ) - op.create_table('queued_send_message', + op.create_table('message_instance', sa.Column('id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False), sa.Column('message_model_id', sa.Integer(), nullable=False), @@ -201,7 +201,7 @@ def upgrade(): sa.Column('message_instance_id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=50), nullable=False), sa.Column('value', sa.String(length=50), nullable=False), - sa.ForeignKeyConstraint(['message_instance_id'], ['queued_send_message.id'], ), + sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique') ) @@ -219,7 +219,7 @@ def downgrade(): op.drop_table('message_correlation') op.drop_table('data_store') op.drop_table('task_event') - op.drop_table('queued_send_message') + op.drop_table('message_instance') op.drop_table('file') op.drop_table('active_task') op.drop_table('user_group_assignment') diff --git a/src/spiffworkflow_backend/models/message_instance.py b/src/spiffworkflow_backend/models/message_instance.py index 4160b791..2164e591 100644 --- a/src/spiffworkflow_backend/models/message_instance.py +++ b/src/spiffworkflow_backend/models/message_instance.py @@ -35,7 +35,7 @@ class MessageStatuses(enum.Enum): class MessageInstanceModel(SpiffworkflowBaseDBModel): """Messages from a process instance that are ready to send to a receiving task.""" - __tablename__ = "queued_send_message" + __tablename__ = "message_instance" id = db.Column(db.Integer, primary_key=True) process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore diff --git a/src/spiffworkflow_backend/services/message_service.py b/src/spiffworkflow_backend/services/message_service.py index 2e17a05b..c3265486 100644 --- a/src/spiffworkflow_backend/services/message_service.py +++ b/src/spiffworkflow_backend/services/message_service.py @@ -51,7 +51,7 @@ class MessageService: for queued_message_send in queued_messages_send: # check again in case another background process picked up the message # while the previous one was running - if queued_message_send.status != "receive": + if queued_message_send.status != "ready": continue queued_message_send.status = "running" @@ -84,6 +84,7 @@ class MessageService: db.session.add(queued_message_receive) db.session.commit() + raise exception def process_message_receive( self, @@ -101,17 +102,18 @@ class MessageService: ) processor_send = ProcessInstanceProcessor(process_instance_send) - spiff_task_send = processor_send.bpmn_process_instance.get_task_by_id( + spiff_task_send = processor_send.get_task_by_id( queued_message_send.bpmn_element_id ) + print(f"queued_message_send.bpmn_element_id: {queued_message_send.bpmn_element_id}") if spiff_task_send is None: raise MessageServiceError( "Processor failed to obtain task.", ) - message_event_send = MessageEventDefinition( - spiff_task_send.id, payload=spiff_task_send.payload - ) + # message_event_send = MessageEventDefinition( + # spiff_task_send.id, payload=spiff_task_send.payload + # ) process_instance_receive = ProcessInstanceModel.query.filter_by( id=queued_message_receive.process_instance_id @@ -125,7 +127,8 @@ class MessageService: ) processor_receive = ProcessInstanceProcessor(process_instance_receive) - processor_receive.bpmn_process_instance.catch(message_event_send) + import pdb; pdb.set_trace() + processor_receive.bpmn_process_instance.catch_bpmn_message(spiff_task_send.id, spiff_task_send.payload) def get_queued_message_receive( self, diff --git a/src/spiffworkflow_backend/services/process_instance_processor.py b/src/spiffworkflow_backend/services/process_instance_processor.py index d616ca6f..91259ef9 100644 --- a/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/src/spiffworkflow_backend/services/process_instance_processor.py @@ -78,22 +78,22 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore """Evaluate.""" return self._evaluate(expression, task.data, task) - def _evaluate( - self, - expression: str, - context: Dict[str, Union[Box, str]], - task: Optional[SpiffTask] = None, - _external_methods: None = None, - ) -> Any: - """Evaluate the given expression, within the context of the given task and return the result.""" - try: - return super()._evaluate(expression, context, task, {}) - except Exception as exception: - raise WorkflowTaskExecException( - task, - "Error evaluating expression " - "'%s', %s" % (expression, str(exception)), - ) from exception + # def _evaluate( + # self, + # expression: str, + # context: Dict[str, Union[Box, str]], + # task: Optional[SpiffTask] = None, + # _external_methods: None = None, + # ) -> Any: + # """Evaluate the given expression, within the context of the given task and return the result.""" + # try: + # return super()._evaluate(expression, context, task, {}) + # except Exception as exception: + # raise WorkflowTaskExecException( + # task, + # "Error evaluating expression " + # "'%s', %s" % (expression, str(exception)), + # ) from exception def execute( self, task: SpiffTask, script: str, data: Dict[str, Dict[str, str]] @@ -487,73 +487,73 @@ class ProcessInstanceProcessor: """Get_status.""" return self.status_of(self.bpmn_process_instance) - def process_bpmn_events(self) -> None: - """Process_bpmn_events.""" - if self.bpmn_process_instance.bpmn_events: - for bpmn_event in self.bpmn_process_instance.bpmn_events: - message_type = None + def process_bpmn_messages(self) -> None: + """Process_bpmn_messages.""" + for bpmn_message in self.bpmn_process_instance.get_bpmn_messages(): + print("WE PROCESS") + message_type = None - # TODO: message: who knows the name of the message model? - # will it be in the bpmn_event? - message_model = MessageModel.query.filter_by( - name=bpmn_event.message_name - ).first() + # TODO: message: who knows the name of the message model? + # will it be in the bpmn_message? + message_model = MessageModel.query.filter_by( + name=bpmn_message.message_name + ).first() - if message_model is None: - raise ApiError( - "invalid_message_name", - f"Invalid message name: {bpmn_event.message_name}.", - ) - - # TODO: message - not sure how to determine message types yet - if bpmn_event.event == "WaitEvent": # and waiting for message: - message_type = "receive" - elif bpmn_event.event == "SendEvent": - message_type = "send" - - if message_type is None: - raise ApiError( - "invalid_event_type", - f"Invalid event type for a message: {bpmn_event.event}.", - ) - - if not bpmn_event.message_correlations: - raise ApiError( - "message_correlations_missing", - f"Could not find any message correlations bpmn_event: {bpmn_event}", - ) - - for message_correlation in bpmn_event.message_correlations: - message_correlation_property = ( - MessageCorrelationPropertyModel.query.filter_by( - message_model_id=message_model.id, - identifier=message_correlation.identifier, - ).first() - ) - if message_correlation_property is None: - raise ApiError( - "message_correlations_missing_from_process", - f"Could not find a known message correlation with identifier: {message_correlation.identifier}", - ) - - message_instance = MessageInstanceModel( - process_instance_id=self.process_instance_model.id, - bpmn_element_id=bpmn_event.task_name, - message_type=message_type, - message_model_id=message_model.id, + if message_model is None: + raise ApiError( + "invalid_message_name", + f"Invalid message name: {bpmn_message.message_name}.", ) - db.session.add(message_instance) - db.session.commit() - # TODO: find out what spiff will call the correlations - for message_correlation in bpmn_event.message_correlations: - message_correlation = MessageCorrelationModel( - message_instance_id=message_instance.id, - name=message_correlation.name, - value=message_correlation.value, + # TODO: message - not sure how to determine message types yet + if bpmn_message.event == "WaitEvent": # and waiting for message: + message_type = "receive" + elif bpmn_message.event == "SendEvent": + message_type = "send" + + if message_type is None: + raise ApiError( + "invalid_event_type", + f"Invalid event type for a message: {bpmn_message.event}.", + ) + + if not bpmn_message.message_correlations: + raise ApiError( + "message_correlations_missing", + f"Could not find any message correlations bpmn_message: {bpmn_message}", + ) + + for message_correlation in bpmn_message.message_correlations: + message_correlation_property = ( + MessageCorrelationPropertyModel.query.filter_by( + message_model_id=message_model.id, + identifier=message_correlation.identifier, + ).first() + ) + if message_correlation_property is None: + raise ApiError( + "message_correlations_missing_from_process", + f"Could not find a known message correlation with identifier: {message_correlation.identifier}", ) - db.session.add(message_correlation) - db.session.commit() + + message_instance = MessageInstanceModel( + process_instance_id=self.process_instance_model.id, + bpmn_element_id=bpmn_message.task_name, + message_type=message_type, + message_model_id=message_model.id, + ) + db.session.add(message_instance) + db.session.commit() + + # TODO: find out what spiff will call the correlations + for message_correlation in bpmn_message.message_correlations: + message_correlation = MessageCorrelationModel( + message_instance_id=message_instance.id, + name=message_correlation.name, + value=message_correlation.value, + ) + db.session.add(message_correlation) + db.session.commit() def do_engine_steps(self, exit_at: None = None) -> None: """Do_engine_steps.""" @@ -561,7 +561,7 @@ class ProcessInstanceProcessor: self.bpmn_process_instance.refresh_waiting_tasks() self.bpmn_process_instance.do_engine_steps(exit_at=exit_at) # TODO: run this - # self.process_bpmn_events() + self.process_bpmn_messages() except WorkflowTaskExecException as we: raise ApiError.from_workflow_exception("task_error", str(we), we) from we @@ -725,7 +725,10 @@ class ProcessInstanceProcessor: def get_task_by_id(self, task_id: str) -> SpiffTask: """Get_task_by_id.""" all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) - return [t for t in all_tasks if t.id == task_id] + for task in all_tasks: + if task.id == task_id: + return task + return None def get_nav_item(self, task: SpiffTask) -> Any: """Get_nav_item.""" diff --git a/tests/spiffworkflow_backend/helpers/base_test.py b/tests/spiffworkflow_backend/helpers/base_test.py index 838ad267..fc50a3a2 100644 --- a/tests/spiffworkflow_backend/helpers/base_test.py +++ b/tests/spiffworkflow_backend/helpers/base_test.py @@ -70,7 +70,6 @@ class BaseTest: updated_at_in_seconds=round(time.time()), start_in_seconds=current_time - (3600 * 1), end_in_seconds=current_time - (3600 * 1 - 20), - bpmn_json=json.dumps({"ikey": "ivalue"}), ) db.session.add(process_instance) db.session.commit() diff --git a/tests/spiffworkflow_backend/unit/test_message_service.py b/tests/spiffworkflow_backend/unit/test_message_service.py index 6c992d75..a4faca6a 100644 --- a/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/tests/spiffworkflow_backend/unit/test_message_service.py @@ -1,4 +1,5 @@ """Test_message_service.""" +from spiffworkflow_backend.services.message_service import MessageService from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec @@ -77,4 +78,6 @@ class TestMessageService(BaseTest): db.session.add(message_correlation_two_receive) db.session.commit() - # MessageService().process_queued_messages() + MessageService().process_queued_messages() + print(queued_message_send.failure_cause) + print(queued_message_send.status)