@task(name="First", result=PrefectResult(), slug="first-slug")
def first():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("ONE!")
return {"users": "thing"}
email_task = EmailTask(
name="email_task",
subject="Test from ATD",
msg="this is a test",
email_to="chia.berry@***.gov", # <- Type your email here
email_from=email_config["email_from"],
smtp_server=email_config["smtp_server"],
smtp_port=email_config["smtp_port"],
smtp_type=email_config["smtp_type"],
attachments=None
)
with Flow(
f"dependent_flow_one_{current_environment}",
storage=GitHub(
repo="cityofaustin/atd-prefect",
path="flows/test/dependent_flows.py",
ref="7368-knack-banner", # The branch name
),
run_config=UniversalRun(
labels=[current_environment]
),
result=PrefectResult()
) as first_flow:
email_data = first()
with Flow(
f"dependent_flows_email_{current_environment}",
storage=GitHub(
repo="cityofaustin/atd-prefect",
path="flows/test/dependent_flows.py",
ref="7368-knack-banner", # The branch name
),
run_config=UniversalRun(
labels=[current_environment]
),
) as second_flow:
first_flow_run_id = create_flow_run(flow_name=first_flow.name)
first_data = get_task_run_result(first_flow_run_id, task_slug="first-slug-copy")
second_flow.add_edge(first_data, email_task(task_args=dict(msg_plain=first_data)))
if __name__ == "__main__":
second_flow.run()