https://prefect.io logo
Title
c

chia berry

02/23/2022, 7:46 PM
Hey yall, I’m looking for some help with sending emails with child/parent flows. I have a setup like this: https://discourse.prefect.io/t/how-to-pass-data-from-a-child-flow-to-another-task-in-a-parent-flow/126, but with an
EmailTask
instead of the transform_and_show task. I want to send the data result in the body of the email. However, I am getting the message
AttributeError: 'FunctionTask' object has no attribute 'encode'
. If I make it a string, I get an empty email. I’d like to send the actual result and not the functiontask.
👀 1
k

Kevin Kho

02/23/2022, 7:48 PM
Hi @chia berry, I think I know what is causing this. Could you show me what you are trying now that causes the error?
c

chia berry

02/23/2022, 7:49 PM
yes, thank you, let me tidy it up and paste here
@task(name="First", result=PrefectResult(), slug="first-slug")
def first():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("ONE!")
    return {"users": "thing"}


email_task = EmailTask(
    name="email_task",
    subject="Test from ATD",
    msg="this is a test",
    email_to="chia.berry@***.gov",  # <- Type your email here
    email_from=email_config["email_from"],
    smtp_server=email_config["smtp_server"],
    smtp_port=email_config["smtp_port"],
    smtp_type=email_config["smtp_type"],
    attachments=None
)

with Flow(
    f"dependent_flow_one_{current_environment}",
    storage=GitHub(
        repo="cityofaustin/atd-prefect",
        path="flows/test/dependent_flows.py",
        ref="7368-knack-banner",  # The branch name
    ),
    run_config=UniversalRun(
        labels=[current_environment]
    ),
    result=PrefectResult()
) as first_flow:
    email_data = first()


with Flow(
    f"dependent_flows_email_{current_environment}",
    storage=GitHub(
        repo="cityofaustin/atd-prefect",
        path="flows/test/dependent_flows.py",
        ref="7368-knack-banner",  # The branch name
    ),
    run_config=UniversalRun(
        labels=[current_environment]
    ),
) as second_flow:
    first_flow_run_id = create_flow_run(flow_name=first_flow.name)
    first_data = get_task_run_result(first_flow_run_id, task_slug="first-slug-copy")
    second_flow.add_edge(first_data, email_task(task_args=dict(msg_plain=first_data)))


if __name__ == "__main__":
    second_flow.run()
k

Kevin Kho

02/23/2022, 7:52 PM
how do you get email_config?
I think you can change
second_flow.add_edge(first_data, email_task(task_args=dict(msg_plain=first_data)))
to
email_task(msg_plain=first_data)
👀 1
c

chia berry

02/23/2022, 7:54 PM
email_config = get_key_value(key="aws_email_config")
and i am successfully getting emails sent to me
k

Kevin Kho

02/23/2022, 7:54 PM
You are using
task_args
wrong.
task_args
is to change the stuff like
timeout=…
and
max_retries=...
. Stuff that goes into the task decorator
c

chia berry

02/23/2022, 7:55 PM
thank you
k

Kevin Kho

02/23/2022, 7:55 PM
Of course! Did that work?
c

chia berry

02/23/2022, 7:58 PM
unfortunately not yet, I now get “AttributeError: ‘dict’ object has no attribute ‘encode’”
k

Kevin Kho

02/23/2022, 8:01 PM
Oh cuz data is a dict right? And the EmailTask takes a string? You might need an intermediate task to format it?
👀 1
c

chia berry

02/23/2022, 8:17 PM
@Kevin Kho thanks a lot for your super speedy help. I’m working on getting my code working but I just wanted to say how much I appreciate your help :thank-you:
k

Kevin Kho

02/23/2022, 8:17 PM
Of course!