Hi! My Flow schematic looks funny. I am using Star...
# ask-community
l
Hi! My Flow schematic looks funny. I am using StartFlowRun to launch sub-flows. Basically everything works, but the schematic and logs look... funny. Am I doing something wrong?
Copy code
@task
def parameter_or_yesterday(p):
    res = (p or (DateTime.today() - timedelta(days=1))).strftime('%Y-%m-%d')
    set_run_name(res)
    return res


def set_run_name(date_param: str):
    client = prefect.Client()
    name = f"{client.get_flow_run_info(prefect.context.flow_run_id).name}_{date_param}"
    client.set_flow_run_name(prefect.context.flow_run_id, name)


extract_flow_task = StartFlowRun(flow_name=extract_flow.name, wait=True, project_name=PROJECT_NAME)
transform_flow_task = StartFlowRun(flow_name=transform_flow.name, wait=True, project_name=PROJECT_NAME)
#
# Parent flow, executed on Kubernetes agent
#
with Flow("parent_flow", storage=storage, run_config=KubernetesRun(labels=["kubernetes"]), schedule=CronSchedule(cron="0 5 * * *")) as parent_flow:
    date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))

    extract_task = extract_flow_task(parameters={"date": date_param})
    transform_flow_task(upstream_tasks=[extract_task], parameters={"date": date_param})
Here is the code ^
Also the list of tasks for the Flow looks funny
k
Hi @Lauri Makinen, it does look a bit funky. Can I see what the DateTimeParameter defnition looks like?
l
Copy code
date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
For
parent_flow
it is this line here (this looks a bit funny too, but I couldn't think of any other way for
date_param
to get a dynamic default value, that is, when
date
is not given as parameter it should default to 'yesterday' from date the flow is run). In the 'child' flows (like
extract
), the `date_param`is fetched like so:
Copy code
with Flow('extract', storage=storage, run_config=LocalRun(labels=["on_prem"])) as extract_flow:
    date_param = DateTimeParameter("date")
    extract_data(date_param)
k
Will look more tomorrow, but what I meant to ask is if DateTimeParameter is something you created?
l
Thanks! DateTimeParameter is a class from prefect
Copy code
from prefect.core.parameter import DateTimeParameter
k
Gotcha. Yeah your code looks right. I think the schematic is just weird because of the way the
StartFlowRun
takes in the params.