Email script now uses an email address instead of a UVA LDAP user_id.
This commit is contained in:
parent
750a202e99
commit
730d0ca18f
|
@ -7,6 +7,8 @@ from crc.scripts.script import Script
|
||||||
from crc.services.ldap_service import LdapService
|
from crc.services.ldap_service import LdapService
|
||||||
from crc.services.mails import send_mail
|
from crc.services.mails import send_mail
|
||||||
|
|
||||||
|
from email_validator import validate_email, EmailNotValidError
|
||||||
|
|
||||||
|
|
||||||
class Email(Script):
|
class Email(Script):
|
||||||
"""This Script allows to be introduced as part of a workflow and called from there, specifying
|
"""This Script allows to be introduced as part of a workflow and called from there, specifying
|
||||||
|
@ -30,7 +32,8 @@ Email Subject ApprvlApprvr1 PIComputingID
|
||||||
def do_task(self, task, *args, **kwargs):
|
def do_task(self, task, *args, **kwargs):
|
||||||
args = [arg for arg in args if type(arg) == str]
|
args = [arg for arg in args if type(arg) == str]
|
||||||
subject = self.get_subject(task, args)
|
subject = self.get_subject(task, args)
|
||||||
recipients = self.get_users_info(task, args)
|
# recipients = self.get_users_info(task, args)
|
||||||
|
recipients = self.get_email_recipients(task, args)
|
||||||
content, content_html = self.get_content(task)
|
content, content_html = self.get_content(task)
|
||||||
if recipients:
|
if recipients:
|
||||||
send_mail(
|
send_mail(
|
||||||
|
@ -41,6 +44,43 @@ Email Subject ApprvlApprvr1 PIComputingID
|
||||||
content_html=content_html
|
content_html=content_html
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_email_recipients(self, task, args):
|
||||||
|
emails = []
|
||||||
|
|
||||||
|
if len(args[1]) < 1:
|
||||||
|
raise ApiError(code="missing_argument",
|
||||||
|
message="Email script requires at least one argument, "
|
||||||
|
"an email address to process. "
|
||||||
|
"Multiple email addresses are accepted.")
|
||||||
|
if isinstance(args[1], str):
|
||||||
|
try:
|
||||||
|
valid = validate_email(args[1])
|
||||||
|
except EmailNotValidError as e:
|
||||||
|
# email is not valid, exception message is human-readable
|
||||||
|
raise ApiError(code="invalid_argument",
|
||||||
|
message="Email script requires a valid email address. "
|
||||||
|
"%s " % e)
|
||||||
|
print(str(e))
|
||||||
|
else:
|
||||||
|
emails.append(valid.email)
|
||||||
|
|
||||||
|
elif isinstance(args[1], list):
|
||||||
|
for arg in args[1]:
|
||||||
|
if isinstance(arg, str):
|
||||||
|
# TODO: need to validate
|
||||||
|
try:
|
||||||
|
valid = validate_email(args[1])
|
||||||
|
except EmailNotValidError as e:
|
||||||
|
# email is not valid, exception message is human-readable
|
||||||
|
raise ApiError(code="invalid_argument",
|
||||||
|
message="Email script requires a valid email address."
|
||||||
|
"Multiple address are allowed.")
|
||||||
|
print(str(e))
|
||||||
|
else:
|
||||||
|
emails.append(valid.email)
|
||||||
|
|
||||||
|
return emails
|
||||||
|
|
||||||
def get_users_info(self, task, args):
|
def get_users_info(self, task, args):
|
||||||
if len(args) < 1:
|
if len(args) < 1:
|
||||||
raise ApiError(code="missing_argument",
|
raise ApiError(code="missing_argument",
|
||||||
|
|
|
@ -1,20 +1,32 @@
|
||||||
from tests.base_test import BaseTest
|
from tests.base_test import BaseTest
|
||||||
|
from crc import mail
|
||||||
|
|
||||||
|
|
||||||
|
# class TestEmailDirectly(BaseTest):
|
||||||
|
#
|
||||||
|
# def test_email_directly(self):
|
||||||
|
# recipients = ['michaelc@cullerton.com']
|
||||||
|
# sender = 'michaelc@cullerton.com'
|
||||||
|
# with mail.record_messages() as outbox:
|
||||||
|
# mail.send_message(subject='testing',
|
||||||
|
# body='test',
|
||||||
|
# recipients=recipients,
|
||||||
|
# sender=sender)
|
||||||
|
# assert len(outbox) == 1
|
||||||
|
# assert outbox[0].subject == "testing"
|
||||||
|
|
||||||
|
|
||||||
class TestEmailScript(BaseTest):
|
class TestEmailScript(BaseTest):
|
||||||
|
|
||||||
def test_email_script(self):
|
def test_email_script(self):
|
||||||
|
with mail.record_messages() as outbox:
|
||||||
|
|
||||||
workflow = self.create_workflow('email_script')
|
workflow = self.create_workflow('email_script')
|
||||||
|
|
||||||
# Start the workflow.
|
first_task = self.get_workflow_api(workflow).next_task
|
||||||
first_task = self.get_workflow_api(workflow).next_task
|
# self.assertEqual('Activity_GetData', first_task.name)
|
||||||
# self.assertEqual('Activity_GetData', first_task.name)
|
workflow = self.get_workflow_api(workflow)
|
||||||
workflow = self.get_workflow_api(workflow)
|
self.complete_form(workflow, first_task, {'email_address': 'michaelc@cullerton.com'})
|
||||||
# self.complete_form(workflow, first_task, {'email_address': 'mike@sartography.com'})
|
|
||||||
# self.complete_form(workflow, first_task, {'email_address': 'kcm4zc'}, user_uid='kcm4zc')
|
self.assertEqual(1, len(outbox))
|
||||||
result = self.complete_form(workflow, first_task, {'email_address': "'kcm4zc'"})
|
self.assertEqual("My Email Subject", outbox[0].subject)
|
||||||
print(result)
|
|
||||||
task = self.get_workflow_api(workflow).next_task
|
|
||||||
self.assertEqual(task.name, 'string')
|
|
||||||
# self.assertEqual('Activity_HowMany', workflow.next_task.name)
|
|
||||||
|
|
Loading…
Reference in New Issue