from jinja2 import Template
from crc.api.common import ApiError
from crc.scripts.script import Script
from import LdapService
from import send_mail
class Email(Script):
"""This Script allows to be introduced as part of a workflow and called from there, specifying
recipients and content """
def get_description(self):
return """
Creates an email, using the provided arguments (a list of UIDs)"
Each argument will be used to look up personal information needed for
the email creation.
Email Subject ApprvlApprvr1 PIComputingID
def do_task_validate_only(self, task, *args, **kwargs):
self.get_emails(task, args)
def do_task(self, task, *args, **kwargs):
subject = self.get_subject(task, args)
recipients = self.get_emails(task, args)
content = self.get_content(task)
if recipients:
subject='Test Subject',
def get_emails(self, task, args):
if len(args) < 1:
raise ApiError(code="missing_argument",
message="Email script requires at least one argument. The "
"name of the variable in the task data that contains user"
"id to process. Multiple arguments are accepted.")
emails = []
for arg in args[1:]:
uid = task.workflow.script_engine.evaluate_expression(task, arg)
user_info = LdapService.user_info(uid)
email = user_info.email_address
if not isinstance(email, str):
raise ApiError(code="invalid_argument",
message="The Email script requires at least 1 UID argument. The "
"name of the variable in the task data that contains subject and"
" user ids to process. This must point to an array or a string, but "
"it currently points to a %s " % emails.__class__.__name__)
return emails
def get_subject(self, task, args):
if len(args) < 1:
raise ApiError(code="missing_argument",
message="Email script requires at least one subject argument. The "
"name of the variable in the task data that contains subject"
" to process. Multiple arguments are accepted.")
subject = task.workflow.script_engine.evaluate_expression(task, args[0])
if not isinstance(subject, str):
raise ApiError(code="invalid_argument",
message="The Email script requires 1 argument. The "
"the name of the variable in the task data that contains user"
"ids to process. This must point to an array or a string, but "
"it currently points to a %s " % emails.__class__.__name__)
return subject
def get_content(self, task):
content = task.task_spec.documentation
template = Template(content)
rendered = template.render({'approver': 'Bossman', 'not_here': 22})
return rendered

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="" xmlns:bpmndi="" xmlns:dc="" xmlns:camunda="" xmlns:di="" id="Definitions_0y2dq4f" targetNamespace="" exporter="Camunda Modeler" exporterVersion="3.7.3">
<bpmn:process id="Process_0tad5ma" name="Set Recipients" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:endEvent id="Event_0izrcj4">
<bpmn:scriptTask id="Activity_0s5v97n" name="Email Recipients">
<bpmn:documentation>Email content to be delivered to {{ approver }}</bpmn:documentation>
<bpmn:script>Email Subject ApprvlApprvr1 PIComputingID</bpmn:script>
<bpmn:sequenceFlow id="Flow_1synsig" sourceRef="StartEvent_1" targetRef="Activity_1l9vih3" />
<bpmn:sequenceFlow id="Flow_1xlrgne" sourceRef="Activity_0s5v97n" targetRef="Event_0izrcj4" />
<bpmn:sequenceFlow id="Flow_08n2npe" sourceRef="Activity_1l9vih3" targetRef="Activity_0s5v97n" />
<bpmn:userTask id="Activity_1l9vih3" name="Set Recipients">
<camunda:formField id="ApprvlApprvr1" label="Approver" type="string" />
<camunda:formField id="PIComputingID" label="Primary Investigator" type="string" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_0tad5ma">
<bpmndi:BPMNEdge id="Flow_1synsig_di" bpmnElement="Flow_1synsig">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
<bpmndi:BPMNEdge id="Flow_1xlrgne_di" bpmnElement="Flow_1xlrgne">
<di:waypoint x="550" y="117" />
<di:waypoint x="662" y="117" />
<bpmndi:BPMNEdge id="Flow_08n2npe_di" bpmnElement="Flow_08n2npe">
<di:waypoint x="370" y="117" />
<di:waypoint x="450" y="117" />
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNShape id="Event_0izrcj4_di" bpmnElement="Event_0izrcj4">
<dc:Bounds x="662" y="99" width="36" height="36" />
<bpmndi:BPMNShape id="Activity_04imfm6_di" bpmnElement="Activity_0s5v97n">
<dc:Bounds x="450" y="77" width="100" height="80" />
<bpmndi:BPMNShape id="Activity_0xugr62_di" bpmnElement="Activity_1l9vih3">
<dc:Bounds x="270" y="77" width="100" height="80" />

from tests.base_test import BaseTest
from import FileService
from import Email
from import WorkflowProcessor
from crc.api.common import ApiError
from crc import db
# from crc.models.approval import ApprovalModel
class TestEmailScript(BaseTest):
def test_do_task(self):
workflow = self.create_workflow('email')
processor = WorkflowProcessor(workflow)
task = processor.next_task()
task = processor.next_task() = {
'PIComputingID': 'dhf8r',
'ApprvlApprvr1': 'lb3dp',
'Subject': 'Email Script needs your help'
script = Email()
script.do_task(task, 'Subject', 'PIComputingID', 'ApprvlApprvr1')