Lauri Makinen
08/09/2021, 12:27 PMLauri Makinen
08/09/2021, 12:28 PM@task
def parameter_or_yesterday(p):
res = (p or (DateTime.today() - timedelta(days=1))).strftime('%Y-%m-%d')
set_run_name(res)
return res
def set_run_name(date_param: str):
client = prefect.Client()
name = f"{client.get_flow_run_info(prefect.context.flow_run_id).name}_{date_param}"
client.set_flow_run_name(prefect.context.flow_run_id, name)
extract_flow_task = StartFlowRun(flow_name=extract_flow.name, wait=True, project_name=PROJECT_NAME)
transform_flow_task = StartFlowRun(flow_name=transform_flow.name, wait=True, project_name=PROJECT_NAME)
#
# Parent flow, executed on Kubernetes agent
#
with Flow("parent_flow", storage=storage, run_config=KubernetesRun(labels=["kubernetes"]), schedule=CronSchedule(cron="0 5 * * *")) as parent_flow:
date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
extract_task = extract_flow_task(parameters={"date": date_param})
transform_flow_task(upstream_tasks=[extract_task], parameters={"date": date_param})
Lauri Makinen
08/09/2021, 12:28 PMLauri Makinen
08/09/2021, 12:29 PMLauri Makinen
08/09/2021, 12:29 PMKevin Kho
Lauri Makinen
08/10/2021, 7:01 AMdate_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
For parent_flow
it is this line here (this looks a bit funny too, but I couldn't think of any other way for date_param
to get a dynamic default value, that is, when date
is not given as parameter it should default to 'yesterday' from date the flow is run). In the 'child' flows (like extract
), the `date_param`is fetched like so:
with Flow('extract', storage=storage, run_config=LocalRun(labels=["on_prem"])) as extract_flow:
date_param = DateTimeParameter("date")
extract_data(date_param)
Kevin Kho
Lauri Makinen
08/10/2021, 7:26 AMfrom prefect.core.parameter import DateTimeParameter
Kevin Kho
StartFlowRun
takes in the params.