burnettk 7ac4b51c6a Squashed 'SpiffWorkflow/' changes from a6392d1906..8d820dce1f
8d820dce1f Track spiff step details more granularly (#17)
426da26d8f Clear the remaining __init__.py imports in SpiffWorkflow (#14)
9a1d1c484a Fix FutureWarning in SpiffWorkflow (#16)

git-subtree-dir: SpiffWorkflow
git-subtree-split: 8d820dce1f439bb76bc07e39629832d998d6f634
2022-11-04 09:33:42 -04:00

87 lines
3.6 KiB
Python

from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException
from .util import first
DEFAULT_NSMAP = {
'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL',
'bpmndi': 'http://www.omg.org/spec/BPMN/20100524/DI',
'dc': 'http://www.omg.org/spec/DD/20100524/DC',
}
CAMUNDA_MODEL_NS = 'http://camunda.org/schema/1.0/bpmn'
class NodeParser:
def __init__(self, node, nsmap=None, filename=None, lane=None):
self.node = node
self.nsmap = nsmap or DEFAULT_NSMAP
self.filename = filename
self.lane = self._get_lane() or lane
self.position = self._get_position() or {'x': 0.0, 'y': 0.0}
def get_id(self):
return self.node.get('id')
def xpath(self, xpath, extra_ns=None):
return self._xpath(self.node, xpath, extra_ns)
def doc_xpath(self, xpath, extra_ns=None):
root = self.node.getroottree().getroot()
return self._xpath(root, xpath, extra_ns)
def parse_condition(self, sequence_flow):
expression = first(self._xpath(sequence_flow, './/bpmn:conditionExpression'))
return expression.text if expression is not None else None
def parse_documentation(self, sequence_flow=None):
node = sequence_flow if sequence_flow is not None else self.node
documentation_node = first(self._xpath(node, './/bpmn:documentation'))
return None if documentation_node is None else documentation_node.text
def parse_incoming_data_references(self):
specs = []
for name in self.xpath('.//bpmn:dataInputAssociation/bpmn:sourceRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
else:
raise ValidationException(f'Cannot resolve dataInputAssociation {name}', self.node, self.filename)
return specs
def parse_outgoing_data_references(self):
specs = []
for name in self.xpath('.//bpmn:dataOutputAssociation/bpmn:targetRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
else:
raise ValidationException(f'Cannot resolve dataOutputAssociation {name}', self.node, self.filename)
return specs
def parse_extensions(self, node=None):
extensions = {}
extra_ns = {'camunda': CAMUNDA_MODEL_NS}
extension_nodes = self.xpath('.//bpmn:extensionElements/camunda:properties/camunda:property', extra_ns)
for ex_node in extension_nodes:
extensions[ex_node.get('name')] = ex_node.get('value')
return extensions
def _get_lane(self):
noderef = first(self.doc_xpath(f".//bpmn:flowNodeRef[text()='{self.get_id()}']"))
if noderef is not None:
return noderef.getparent().get('name')
def _get_position(self):
bounds = first(self.doc_xpath(f".//bpmndi:BPMNShape[@bpmnElement='{self.get_id()}']//dc:Bounds"))
if bounds is not None:
return {'x': float(bounds.get('x', 0)), 'y': float(bounds.get('y', 0))}
def _xpath(self, node, xpath, extra_ns=None):
if extra_ns is not None:
nsmap = self.nsmap.copy()
nsmap.update(extra_ns)
else:
nsmap = self.nsmap
return node.xpath(xpath, namespaces=nsmap)