Thread
#prefect-community
    Lauri Makinen

    Lauri Makinen

    1 year ago
    Hi! My Flow schematic looks funny. I am using StartFlowRun to launch sub-flows. Basically everything works, but the schematic and logs look... funny. Am I doing something wrong?
    @task
    def parameter_or_yesterday(p):
        res = (p or (DateTime.today() - timedelta(days=1))).strftime('%Y-%m-%d')
        set_run_name(res)
        return res
    
    
    def set_run_name(date_param: str):
        client = prefect.Client()
        name = f"{client.get_flow_run_info(prefect.context.flow_run_id).name}_{date_param}"
        client.set_flow_run_name(prefect.context.flow_run_id, name)
    
    
    extract_flow_task = StartFlowRun(flow_name=extract_flow.name, wait=True, project_name=PROJECT_NAME)
    transform_flow_task = StartFlowRun(flow_name=transform_flow.name, wait=True, project_name=PROJECT_NAME)
    #
    # Parent flow, executed on Kubernetes agent
    #
    with Flow("parent_flow", storage=storage, run_config=KubernetesRun(labels=["kubernetes"]), schedule=CronSchedule(cron="0 5 * * *")) as parent_flow:
        date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
    
        extract_task = extract_flow_task(parameters={"date": date_param})
        transform_flow_task(upstream_tasks=[extract_task], parameters={"date": date_param})
    Here is the code ^
    Also the list of tasks for the Flow looks funny
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Lauri Makinen, it does look a bit funky. Can I see what the DateTimeParameter defnition looks like?
    Lauri Makinen

    Lauri Makinen

    1 year ago
    date_param = parameter_or_yesterday(DateTimeParameter("date", required=False))
    For
    parent_flow
    it is this line here (this looks a bit funny too, but I couldn't think of any other way for
    date_param
    to get a dynamic default value, that is, when
    date
    is not given as parameter it should default to 'yesterday' from date the flow is run). In the 'child' flows (like
    extract
    ), the date_paramis fetched like so:
    with Flow('extract', storage=storage, run_config=LocalRun(labels=["on_prem"])) as extract_flow:
        date_param = DateTimeParameter("date")
        extract_data(date_param)
    Kevin Kho

    Kevin Kho

    1 year ago
    Will look more tomorrow, but what I meant to ask is if DateTimeParameter is something you created?
    Lauri Makinen

    Lauri Makinen

    1 year ago
    Thanks! DateTimeParameter is a class from prefect
    from prefect.core.parameter import DateTimeParameter
    Kevin Kho

    Kevin Kho

    1 year ago
    Gotcha. Yeah your code looks right. I think the schematic is just weird because of the way the
    StartFlowRun
    takes in the params.