Feature/event payloads (#393)

* some initial changes for event payload changes in spiff

* fixed tests for new spiffworkflow with event payloads w/ burnettk essweine

* pyl w/ burnettk essweine

* updated SpiffWorkflow from branch

* switched SpiffWorkflow back to main w/ burnettk

* added base for migration script to upgrade db w/ burnettk essweine

* some updates to script w/ burnettk

* script has been written, needs to be tested

* pyl w/ burnettk

* updates to migration script so it can work w/ burnettk

* pyl w/ burnettk

* added comment to data migration file

* run the version 1 3 migration on app boot w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-07-18 09:56:24 -04:00 committed by GitHub
parent 144ae9f927
commit 4cf60acb27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 309 additions and 40 deletions

View File

@ -38,7 +38,7 @@ function run_autofixers() {
asdf reshim python
fi
python_dirs=$(get_python_dirs)
python_dirs="$(get_python_dirs) bin"
ruff --fix $python_dirs || echo ''
}

View File

@ -88,6 +88,10 @@ if [[ -z "${SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER:-}" ]]; then
export SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER
fi
# DELETE after this runs on all necessary environments
# TODO: make a system somewhat like schema migrations (storing versions in a db table) to handle data migrations
poet run python ./bin/data_migrations/version_1_3.py
# --worker-class is not strictly necessary, since setting threads will automatically set the worker class to gthread, but meh
export IS_GUNICORN="true"
# THIS MUST BE THE LAST COMMAND!

View File

@ -0,0 +1,203 @@
import copy
import json
import os
import uuid
from hashlib import sha256
from spiffworkflow_backend import create_app
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from sqlalchemy import or_
from sqlalchemy.orm.attributes import flag_modified
class VersionOneThree:
"""Migrates data in the database to be compatible with SpiffWorkflow at git revision ebcdde95.
Converts migration file from SpiffWorkflow to work with backend's db:
https://github.com/sartography/SpiffWorkflow/blob/main/SpiffWorkflow/bpmn/serializer/migration/version_1_3.py
"""
def run(self) -> None:
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "local_development"
if os.environ.get("SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR") is None:
os.environ["SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR"] = "hey"
flask_env_key = "FLASK_SESSION_SECRET_KEY"
os.environ[flask_env_key] = "whatevs"
app = create_app()
with app.app_context():
task_definitions = self.get_relevant_task_definitions()
for task_definition in task_definitions:
self.process_task_definition(task_definition)
relating_task_models = TaskModel.query.filter_by(task_definition_id=task_definition.id).all()
for task_model in relating_task_models:
self.process_task_model(task_model, task_definition)
task_definitions_with_events = TaskDefinitionModel.query.filter(
or_(
TaskDefinitionModel.typename.like("%Event%"), # type: ignore
TaskDefinitionModel.typename.in_(["SendTask", "ReceiveTask"]), # type: ignore
)
).all()
for tdwe in task_definitions_with_events:
self.update_event_definitions(tdwe)
db.session.commit()
def get_relevant_task_definitions(self) -> list[TaskDefinitionModel]:
task_definitions: list[TaskDefinitionModel] = TaskDefinitionModel.query.filter_by(
typename="_BoundaryEventParent"
).all()
return task_definitions
def process_task_definition(self, task_definition: TaskDefinitionModel) -> None:
task_definition.typename = "BoundaryEventSplit"
task_definition.bpmn_identifier = task_definition.bpmn_identifier.replace(
"BoundaryEventParent", "BoundaryEventSplit"
)
properties_json = copy.copy(task_definition.properties_json)
properties_json.pop("main_child_task_spec")
properties_json["typename"] = task_definition.typename
properties_json["name"] = task_definition.bpmn_identifier
task_definition.properties_json = properties_json
flag_modified(task_definition, "properties_json") # type: ignore
db.session.add(task_definition)
join_properties_json = {
"name": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
"manual": False,
"bpmn_id": None,
"lookahead": 2,
"inputs": properties_json["outputs"],
"outputs": [],
"split_task": task_definition.bpmn_identifier,
"threshold": None,
"cancel": True,
"typename": "BoundaryEventJoin",
}
join_task_definition = TaskDefinitionModel(
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
bpmn_identifier=join_properties_json["name"],
typename=join_properties_json["typename"],
properties_json=join_properties_json,
)
db.session.add(join_task_definition)
for parent_bpmn_identifier in properties_json["inputs"]:
parent_task_definition = TaskDefinitionModel.query.filter_by(
bpmn_identifier=parent_bpmn_identifier,
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
).first()
parent_task_definition.properties_json["outputs"] = [
name.replace("BoundaryEventParent", "BoundaryEventSplit")
for name in parent_task_definition.properties_json["outputs"]
]
flag_modified(parent_task_definition, "properties_json") # type: ignore
db.session.add(parent_task_definition)
for child_bpmn_identifier in properties_json["outputs"]:
child_task_definition = TaskDefinitionModel.query.filter_by(
bpmn_identifier=child_bpmn_identifier,
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
).first()
child_task_definition.properties_json["outputs"].append(join_task_definition.bpmn_identifier)
child_task_definition.properties_json["inputs"] = [
name.replace("BoundaryEventParent", "BoundaryEventSplit")
for name in child_task_definition.properties_json["inputs"]
]
flag_modified(child_task_definition, "properties_json") # type: ignore
db.session.add(child_task_definition)
def process_task_model(self, task_model: TaskModel, task_definition: TaskDefinitionModel) -> None:
task_model.properties_json["task_spec"] = task_definition.bpmn_identifier
flag_modified(task_model, "properties_json") # type: ignore
db.session.add(task_model)
child_task_models = []
all_children_completed = True
# Ruff keeps complaining unless it's done like this
blank_json = json.dumps({})
blank_json_data_hash = sha256(blank_json.encode("utf8")).hexdigest()
for child_task_guid in task_model.properties_json["children"]:
child_task_model = TaskModel.query.filter_by(guid=child_task_guid).first()
if child_task_model is None:
continue
if child_task_model.state not in ["COMPLETED", "CANCELLED"]:
all_children_completed = False
child_task_models.append(child_task_model)
for child_task_model in child_task_models:
if child_task_model.state == "CANCELLED":
# Cancelled tasks don't have children
continue
new_task_state = None
start_in_seconds = child_task_model.start_in_seconds
end_in_seconds = None
if child_task_model.state in ["MAYBE", "LIKELY", "FUTURE"]:
new_task_state = child_task_model.state
elif child_task_model.state in ["WAITING", "READY", "STARTED"]:
new_task_state = "FUTURE"
elif child_task_model.state == "COMPLETED":
if all_children_completed:
new_task_state = "COMPLETED"
end_in_seconds = child_task_model.end_in_seconds
else:
new_task_state = "WAITING"
elif child_task_model.state == "ERROR":
new_task_state = "WAITING"
else:
raise Exception(f"Unknown state: {child_task_model.state} for {child_task_model.guild}")
new_task_properties_json = {
"id": str(uuid.uuid4()),
"parent": child_task_model.guid,
"children": [],
"state": Task.task_state_name_to_int(new_task_state),
"task_spec": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
"last_state_change": None,
"triggered": False,
"internal_data": {},
}
new_task_model = TaskModel(
guid=new_task_properties_json["id"],
bpmn_process_id=task_model.bpmn_process_id,
process_instance_id=task_model.process_instance_id,
task_definition_id=task_model.task_definition_id,
state=new_task_state,
properties_json=new_task_properties_json,
start_in_seconds=start_in_seconds,
end_in_seconds=end_in_seconds,
json_data_hash=blank_json_data_hash,
python_env_data_hash=blank_json_data_hash,
)
db.session.add(new_task_model)
child_task_model.properties_json["children"].append(new_task_model.guid)
flag_modified(child_task_model, "properties_json") # type: ignore
db.session.add(child_task_model)
def update_event_definitions(self, task_definition: TaskDefinitionModel) -> None:
if "event_definition" in task_definition.properties_json:
properties_json = copy.copy(task_definition.properties_json)
properties_json["event_definition"].pop("internal", None)
properties_json["event_definition"].pop("external", None)
if "escalation_code" in properties_json["event_definition"]:
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop(
"escalation_code"
)
if "error_code" in properties_json["event_definition"]:
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop("error_code")
task_definition.properties_json = properties_json
flag_modified(task_definition, "properties_json") # type: ignore
db.session.add(task_definition)
VersionOneThree().run()

View File

@ -0,0 +1,32 @@
"""empty message
Revision ID: 214e0c5fb418
Revises: 64adf34a98db
Create Date: 2023-07-11 14:52:15.825136
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '214e0c5fb418'
down_revision = '64adf34a98db'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
batch_op.add_column(sa.Column('correlation_key_names', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
batch_op.drop_column('correlation_key_names')
# ### end Alembic commands ###

View File

@ -2267,7 +2267,7 @@ lxml = "*"
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "9bd018e596d42b897da67ff894f458d6bc40bdf9"
resolved_reference = "ebcdde95c2f59f67981add1eacf9f5e04520d50f"
[[package]]
name = "sqlalchemy"

View File

@ -29,8 +29,11 @@ class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True)
message_instance_id = db.Column(ForeignKey(MessageInstanceModel.id), nullable=False, index=True) # type: ignore
name: str = db.Column(db.String(50), nullable=False, index=True)
retrieval_expression: str = db.Column(db.String(255))
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
correlation_key_names: list = db.Column(db.JSON)
message_instance = relationship("MessageInstanceModel", back_populates="correlation_rules")

View File

@ -1,4 +1,6 @@
from SpiffWorkflow.bpmn.workflow import BpmnMessage # type: ignore
from SpiffWorkflow.bpmn.event import BpmnEvent # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty # type: ignore
from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition # type: ignore
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageStatuses
@ -72,8 +74,7 @@ class MessageService:
cls.process_message_receive(
receiving_process,
message_instance_receive,
message_instance_send.name,
message_instance_send.payload,
message_instance_send,
)
message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id
@ -130,7 +131,7 @@ class MessageService:
(
(
"Process instance cannot be found for queued message:"
f" {message_instance_receive.id}.Tried with id"
f" {message_instance_receive.id}. Tried with id"
f" {message_instance_receive.process_instance_id}"
),
)
@ -141,16 +142,28 @@ class MessageService:
def process_message_receive(
process_instance_receive: ProcessInstanceModel,
message_instance_receive: MessageInstanceModel,
message_model_name: str,
message_payload: dict,
message_instance_send: MessageInstanceModel,
) -> None:
bpmn_message = BpmnMessage(
None,
message_model_name,
message_payload,
correlation_properties = []
for cr in message_instance_receive.correlation_rules:
correlation_properties.append(
CorrelationProperty(
name=cr.name,
retrieval_expression=cr.retrieval_expression,
correlation_keys=cr.correlation_key_names,
)
)
bpmn_message = MessageEventDefinition(
name=message_instance_send.name,
correlation_properties=correlation_properties,
)
bpmn_event = BpmnEvent(
event_definition=bpmn_message,
payload=message_instance_send.payload,
correlations=message_instance_send.correlation_keys,
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch_bpmn_message(bpmn_message)
processor_receive.bpmn_process_instance.send_event(bpmn_event)
processor_receive.do_engine_steps(save=True)
message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive)

View File

@ -24,6 +24,7 @@ from flask import current_app
from lxml import etree # type: ignore
from lxml.etree import XMLSyntaxError # type: ignore
from RestrictedPython import safe_globals # type: ignore
from SpiffWorkflow.bpmn.event import BpmnEvent # type: ignore
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore
@ -1083,10 +1084,12 @@ class ProcessInstanceProcessor:
"""Send an event to the workflow."""
payload = event_data.pop("payload", None)
event_definition = self._event_serializer.registry.restore(event_data)
if payload is not None:
event_definition.payload = payload
bpmn_event = BpmnEvent(
event_definition=event_definition,
payload=payload,
)
try:
self.bpmn_process_instance.catch(event_definition)
self.bpmn_process_instance.send_event(bpmn_event)
except Exception as e:
print(e)

View File

@ -10,8 +10,8 @@ from urllib.parse import unquote
import sentry_sdk
from flask import current_app
from flask import g
from SpiffWorkflow.bpmn.specs.control import _BoundaryEventParent # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition # type: ignore
from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from spiffworkflow_backend import db
from spiffworkflow_backend.exceptions.api_error import ApiError
@ -170,7 +170,7 @@ class ProcessInstanceService:
@classmethod
def ready_user_task_has_associated_timer(cls, processor: ProcessInstanceProcessor) -> bool:
for ready_user_task in processor.bpmn_process_instance.get_ready_user_tasks():
if isinstance(ready_user_task.parent.task_spec, _BoundaryEventParent):
if isinstance(ready_user_task.parent.task_spec, BoundaryEventSplit):
return True
return False

View File

@ -9,6 +9,7 @@ from uuid import UUID
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
@ -433,15 +434,21 @@ class WorkflowExecutionService:
self.process_instance_saver()
def process_bpmn_messages(self) -> None:
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
# FIXE: get_events clears out the events so if we have other events we care about
# this will clear them out as well.
# Right now we only care about messages though.
bpmn_events = self.bpmn_process_instance.get_events()
for bpmn_event in bpmn_events:
if not isinstance(bpmn_event.event_definition, MessageEventDefinition):
continue
bpmn_message = bpmn_event.event_definition
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
user_id=self.process_instance_model.process_initiator_id,
# TODO: use the correct swimlane user when that is set up
message_type="send",
name=bpmn_message.name,
payload=bpmn_message.payload,
payload=bpmn_event.payload,
correlation_keys=self.bpmn_process_instance.correlations,
)
db.session.add(message_instance)
@ -451,21 +458,21 @@ class WorkflowExecutionService:
bpmn_process_correlations = self.bpmn_process_instance.correlations
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
# update correlations correctly but always null out bpmn_messages since they get cleared out later
bpmn_process.properties_json["bpmn_messages"] = []
bpmn_process.properties_json["bpmn_events"] = []
db.session.add(bpmn_process)
db.session.commit()
def queue_waiting_receive_messages(self) -> None:
waiting_events = self.bpmn_process_instance.waiting_events()
waiting_message_events = filter(lambda e: e["event_type"] == "MessageEventDefinition", waiting_events)
waiting_message_events = filter(lambda e: e.event_type == "MessageEventDefinition", waiting_events)
for event in waiting_message_events:
# Ensure we are only creating one message instance for each waiting message
if (
MessageInstanceModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
message_type="receive",
name=event["name"],
name=event.name,
).count()
> 0
):
@ -476,14 +483,15 @@ class WorkflowExecutionService:
process_instance_id=self.process_instance_model.id,
user_id=self.process_instance_model.process_initiator_id,
message_type="receive",
name=event["name"],
name=event.name,
correlation_keys=self.bpmn_process_instance.correlations,
)
for correlation_property in event["value"]:
for correlation_property in event.value:
message_correlation = MessageInstanceCorrelationRuleModel(
message_instance=message_instance,
name=correlation_property.name,
retrieval_expression=correlation_property.retrieval_expression,
correlation_key_names=correlation_property.correlation_keys,
)
db.session.add(message_correlation)
db.session.add(message_instance)

View File

@ -6,17 +6,17 @@ from SpiffWorkflow.bpmn.parser.util import full_tag # type: ignore
from SpiffWorkflow.bpmn.serializer.task_spec import EventConverter # type: ignore
from SpiffWorkflow.bpmn.serializer.task_spec import StartEventConverter as DefaultStartEventConverter
from SpiffWorkflow.bpmn.specs.defaults import StartEvent as DefaultStartEvent # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions import CycleTimerEventDefinition # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions import DurationTimerEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions import NoneEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions import TimeDateEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions.simple import NoneEventDefinition # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.timer import CycleTimerEventDefinition # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.timer import DurationTimerEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimeDateEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition
from SpiffWorkflow.spiff.parser.event_parsers import SpiffStartEventParser # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
StartConfiguration = tuple[int, int, int]
# TODO: cylce timers and repeat counts?
# TODO: cycle timers and repeat counts?
class StartEvent(DefaultStartEvent): # type: ignore

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="test_process_to_call" name="Test Process To Call" isExecutable="true">
<bpmn:endEvent id="Event_03zsjvn">
<bpmn:incoming>Flow_089aeua</bpmn:incoming>
@ -51,9 +51,14 @@
<bpmn:script>set_in_test_process_to_call_script = 1</bpmn:script>
</bpmn:scriptTask>
<bpmn:boundaryEvent id="our_boundary_event" name="our_boundary_event" attachedToRef="test_process_to_call_script">
<bpmn:escalationEventDefinition id="EscalationEventDefinition_1bs7saf" />
<bpmn:escalationEventDefinition id="EscalationEventDefinition_0t7834v" escalationRef="Escalation_18qf8th" errorRef="[object Object]" />
</bpmn:boundaryEvent>
</bpmn:process>
<bpmn:escalation id="Escalation_18qf8th" name="Our Escalation" errorCode="26">
<bpmn:extensionElements>
<spiffworkflow:variableName>the_var</spiffworkflow:variableName>
</bpmn:extensionElements>
</bpmn:escalation>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="test_process_to_call">
<bpmndi:BPMNShape id="Event_03zsjvn_di" bpmnElement="Event_03zsjvn">
@ -69,10 +74,10 @@
<dc:Bounds x="450" y="110" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0wfnf83_di" bpmnElement="our_boundary_event">
<dc:Bounds x="492" y="172" width="36" height="36" />
<bpmndi:BPMNShape id="Event_0bjx9wm_di" bpmnElement="our_boundary_event">
<dc:Bounds x="502" y="172" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="467" y="215" width="87" height="27" />
<dc:Bounds x="477" y="215" width="87" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1qsx5et_di" bpmnElement="Flow_1qsx5et">

View File

@ -2412,8 +2412,6 @@ class TestProcessApi(BaseTest):
data: dict = {
"correlation_properties": [],
"expression": None,
"external": True,
"internal": False,
"payload": {"message": "message 1"},
"name": "Message 1",
"typename": "MessageEventDefinition",