Lauri Makinen
08/09/2021, 12:27 PMLauri Makinen
08/09/2021, 12:28 PM@task
def parameter_or_yesterday(p):
    res = (p or (DateTime.today() - timedelta(days=1))).strftime('%Y-%m-%d')
    set_run_name(res)
    return res
def set_run_name(date_param: str):
    client = prefect.Client()
    name = f"{client.get_flow_run_info(prefect.context.flow_run_id).name}_{date_param}"
    client.set_flow_run_name(prefect.context.flow_run_id, name)
extract_flow_task = StartFlowRun(flow_name=extract_flow.name, wait=True, project_name=PROJECT_NAME)
transform_flow_task = StartFlowRun(flow_name=transform_flow.name, wait=True, project_name=PROJECT_NAME)
#
# Parent flow, executed on Kubernetes agent
#
with Flow("parent_flow", storage=storage, run_config=KubernetesRun(labels=["kubernetes"]), schedule=CronSchedule(cron="0 5 * * *")) as parent_flow:
    date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
    extract_task = extract_flow_task(parameters={"date": date_param})
    transform_flow_task(upstream_tasks=[extract_task], parameters={"date": date_param})Lauri Makinen
08/09/2021, 12:28 PMLauri Makinen
08/09/2021, 12:29 PMLauri Makinen
08/09/2021, 12:29 PMKevin Kho
Lauri Makinen
08/10/2021, 7:01 AMdate_param = parameter_or_yesterday(DateTimeParameter("date", required=False))parent_flowdate_paramdateextractwith Flow('extract', storage=storage, run_config=LocalRun(labels=["on_prem"])) as extract_flow:
    date_param = DateTimeParameter("date")
    extract_data(date_param)Kevin Kho
Lauri Makinen
08/10/2021, 7:26 AMfrom prefect.core.parameter import DateTimeParameterKevin Kho
StartFlowRun