Lauri Makinen
08/09/2021, 12:27 PMLauri Makinen
08/09/2021, 12:28 PM@task
def parameter_or_yesterday(p):
res = (p or (DateTime.today() - timedelta(days=1))).strftime('%Y-%m-%d')
set_run_name(res)
return res
def set_run_name(date_param: str):
client = prefect.Client()
name = f"{client.get_flow_run_info(prefect.context.flow_run_id).name}_{date_param}"
client.set_flow_run_name(prefect.context.flow_run_id, name)
extract_flow_task = StartFlowRun(flow_name=extract_flow.name, wait=True, project_name=PROJECT_NAME)
transform_flow_task = StartFlowRun(flow_name=transform_flow.name, wait=True, project_name=PROJECT_NAME)
#
# Parent flow, executed on Kubernetes agent
#
with Flow("parent_flow", storage=storage, run_config=KubernetesRun(labels=["kubernetes"]), schedule=CronSchedule(cron="0 5 * * *")) as parent_flow:
date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
extract_task = extract_flow_task(parameters={"date": date_param})
transform_flow_task(upstream_tasks=[extract_task], parameters={"date": date_param})Lauri Makinen
08/09/2021, 12:28 PMLauri Makinen
08/09/2021, 12:29 PMLauri Makinen
08/09/2021, 12:29 PMKevin Kho
Lauri Makinen
08/10/2021, 7:01 AMdate_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
For parent_flow it is this line here (this looks a bit funny too, but I couldn't think of any other way for date_param to get a dynamic default value, that is, when date is not given as parameter it should default to 'yesterday' from date the flow is run). In the 'child' flows (like extract), the `date_param`is fetched like so:
with Flow('extract', storage=storage, run_config=LocalRun(labels=["on_prem"])) as extract_flow:
date_param = DateTimeParameter("date")
extract_data(date_param)Kevin Kho
Lauri Makinen
08/10/2021, 7:26 AMfrom prefect.core.parameter import DateTimeParameterKevin Kho
StartFlowRun takes in the params.