Thread
#prefect-community
    Josephine Douglas

    Josephine Douglas

    5 months ago
    Hello! I am trying to create a flow that runs a task, sends the data resulting from that task to a secondary flow, waits for the secondary flow to run successfully, then runs another task. [code in thread] The primary and secondary flows are both registered in the same tenant and project but are set to use different images. When I try to run the primary flow, I get this error:
    Failed to load and execute Flow's environment: TypeError("got an unexpected keyword argument 'raise_final_state'")
    What am I doing incorrectly here?
    Kevin Kho

    Kevin Kho

    5 months ago
    Hey Josephine, I think you have an old Prefect version that doesn’t have that keyword argument. What version are you on?
    Could you move the code to the thread whenever you get the chance so we don’t crowd the main channel. You can do something like “details in thread” and then just paste the code here
    raise_final_state
    was added in 0.15.8 here
    Josephine Douglas

    Josephine Douglas

    5 months ago
    primary flow:
    daily_schedule = CronSchedule("30 9 * * *", start_date=pendulum.now(tz="US/Pacific"))
    
    flow = Flow("primary_flow", schedule=daily_schedule)
    
    with flow:
        x, y = task1()
        secondary_flow = create_flow_run(flow_name='secondary_flow', project_name='ProjectName', parameters={'x': x})
        wait_for_secondary_flow = wait_for_flow_run(secondary_flow, raise_final_state=True)
        task2_task = task2(y)
        task2_task.set_upstream(wait_for_secondary_flow)
    
    # STORAGE
    flow.storage = GitLab(...)
    
    flow.run_config = KubernetesRun(
        labels=["dev"],
        image="",
        cpu_limit="400m",
        cpu_request="100m",
        memory_limit="4096Mi",
        memory_request="256Mi"
    )
    secondary_flow looks like this:
    flow = Flow("secondary_flow", executor=LocalDaskExecutor(scheduler="threads", num_workers=8))
    
    with flow:
        brand_ids = Parameter("x", default=["test_not_exist_prefect"])
        ...
    
    
    # STORAGE
    flow.storage = GitLab(...)
    
    flow.run_config = KubernetesRun(
        labels=["dev"],
        image="",
        cpu_limit="400m",
        cpu_request="100m",
        memory_limit="4096Mi",
        memory_request="256Mi"
    )
    Kevin Kho

    Kevin Kho

    5 months ago
    Thank you!
    Josephine Douglas

    Josephine Douglas

    5 months ago
    @Kevin Kho thank YOU! I will look into the version but I think you are correct