Hey everyone I have a weird situation that I like...
# ask-community
n
Hey everyone I have a weird situation that I like to hear what you think I have a parent flow that triggers child flow when I get a failed status in the parent flow with error that the new flow didn't get the params But the new flow is actually triggered with the right params What I can tell on the child flow is that he waits for a minute to scale-up it's resources before starting
a
@Noam polak I assume you’re using a state handler? Can you share how you defined it? This may occur when you trigger a flow that has required parameters, but you didn’t pass them. The easiest way to solve it would be to assign sensible default values to your parameter tasks. What do you mean by scaling up resources before a flow run?
n
I'm not using any state handler on the parent flow but only in the child flow The wired thing is child flow is triggered with the right parameters but the parent gets a failed task status
a
can you share both the child and parent flow? otherwise, it’s hard to give any recommendations
n
I mean that the pod I am using the the child flow is pretty large so it takes some time before it really starts
ok
The part where I trigger the child flow is:
Copy code
start_flow_run = StartFlowRun(flow_name=FS_FQ_FLOW, project_name="default")

    internal_input_data_updated = update_internal_input_data(internal_input_data)
    flow.set_dependencies(start_flow_run, [internal_input_data_updated, exists, run_config])
    start_flow_run(
        upstream_tasks=[company_ex_id, exists, internal_input_data_updated],
        parameters={
            "input_data": input_data,
            "internal_input_data": internal_input_data_updated,
            "request_id": request_id,
            "fq_run_id": fq_run_id,
        },
        run_config=run_config,
    )
and the child flow starts:
Copy code
with Flow(
    "flow_name",
    run_config=KubernetesRun(
        job_template_path="flows_path/kubernetes_job_template.yaml",
        image=f"<http://gcr.io/some_project:{DOCKER_IMAGE_TAG}|gcr.io/some_project:{DOCKER_IMAGE_TAG}>"
    ),
    storage=GCS(bucket=BUCKET_NAME),
    state_handlers=[post_to_handler, update_state_handler],
    terminal_state_handler=require_success_terminal_state_handler,
) as flow:
    input_data = Parameter("input_data", required=True)
    internal_input_data = Parameter("internal_input_data", required=True)
    fq_run_id = Parameter("fq_run_id", required=True)
    request_id = Parameter("request_id", required=True)

      notification_recipient = Parameter(
        "notification_recipient", default=None
    )

    currency = input_data["key"]

    # flow tasks....
a
It looks like you’re calling the start_flow_run task twice in the child flow and this causes the error that you see: #1: Here you’re calling it without parameters
Copy code
flow.set_dependencies(start_flow_run, [internal_input_data_updated, exists, run_config])
#2: Here you’re calling it correctly
Copy code
start_flow_run(
        upstream_tasks=[company_ex_id, exists, internal_input_data_updated],
        parameters={
            "input_data": input_data,
            "internal_input_data": internal_input_data_updated,
            "request_id": request_id,
            "fq_run_id": fq_run_id,
        },
        run_config=run_config,
    )
What would be better and easier is when you instantiate the
StartFlowRun
class before the Flow constructor:
Copy code
start_flow_run = StartFlowRun(flow_name=FS_FQ_FLOW, project_name="default")

with Flow(FLOW_NAME) as flow:
    internal_input_data_updated = update_internal_input_data(internal_input_data)
    start_flow_run_task = start_flow_run(
        upstream_tasks=[company_ex_id, exists, run_config, internal_input_data_updated],
        parameters={
            "input_data": input_data,
            "internal_input_data": internal_input_data_updated,
            "request_id": request_id,
            "fq_run_id": fq_run_id,
        },
        run_config=run_config,
    )
Alternatively, you can replace StartFlowRun with `create_flow_run`:
Copy code
from prefect.tasks.prefect import create_flow_run

with Flow(FLOW_NAME) as flow:
    internal_input_data_updated = update_internal_input_data(internal_input_data)
    start_flow_run_task = create_flow_run(flow_name=FS_FQ_FLOW, project_name="default",
        upstream_tasks=[company_ex_id, exists, run_config, internal_input_data_updated],
        parameters={
            "input_data": input_data,
            "internal_input_data": internal_input_data_updated,
            "request_id": request_id,
            "fq_run_id": fq_run_id,
        },
        run_config=run_config,
    )
cc @Noam polak
n
Hey It helped Thanks a lot
👀 1
🙌 1