Patrick Tan

08/09/2022, 7:42 PM
Hi, Do you have example of create_flow_run_from_deployment, wait for completion of flow run before continue to next line of code execution in 2.0? I was using create_flow_run and wait_for_flow_run in 1.0
👀 1

Bianca Hoch

08/09/2022, 8:06 PM
Hello Patrick, here is an example for creating a flow run from deployment:

Patrick Tan

08/09/2022, 8:15 PM
I tried this example, it does not wait for completion. It is running async mode. I want it to wait for flow run to be completed. My 1.0 code looks like this:
Copy code
flow_run_ids = create_flow_run(
        run_name=unmapped("Flow Test-LiveLots-ETL"),"%m/%d/%Y:%H:%M:%S"),
        parameters={"config_file": config_file, "bucket": bucket, "prefix": prefix},

    wait_for_flow =
   wait_for_flow_run(flow_run_ids, raise_final_state=unmapped(True))

Bianca Hoch

08/09/2022, 8:21 PM
Ah, understood. One moment while we do some research on our end to troubleshoot this. We also received your email, but we can continue the conversation here if that works for you.

Patrick Tan

08/09/2022, 8:24 PM
Yes this works as well

Bianca Hoch

08/09/2022, 9:55 PM
I think that this article may be helpful for you to accomplish what you have in the 1.0 example, in the context of 2.0. In 2.0, subflows can be called from the parent flow upon execution. This can be set up so that downstream subflows are dependent on the state of those that are upstream. Note that in Orion, subflows are run sequentially by default. In addition, if you'd like to pass different parameters to the subflows, you could create a for loop for your parent flow which passes in parameters to the subflows at runtime. Here is an example of how to accomplish this:
Copy code
from prefect import flow

@flow(name = "Flow A")
def flow_A(params):
    print(f"I am flow A, and I received these parameters: {params}")

@flow(name = "Flow B")
def flow_b():
    print(f"I am flow B, I run after Flow A")

params_list = [{"config_file": "config_file", "bucket": "bucket", "prefix": "prefix"},
                {"config_file": "config_file2", "bucket2": "bucket2", "prefix": "prefix2"},
                {"config_file": "config_file3", "bucket2": "bucket3", "prefix": "prefix3"}]

@flow(name = "Parent Flow")
def parent_flow(params_list):
    for params in params_list:
    #Runs after Flow A is done


Patrick Tan

08/10/2022, 3:21 PM
I tried this method and it works. However, what I really want is to run a deployed flow and wait for completion. Is this possible in 2.0? If not, will it be in future release?

Mason Menges

08/10/2022, 9:35 PM
Hey @Patrick Tan The short answer to this questions is yes, we'll be expanding on orchestration patters more in the future, that said you could probably accomplish this with native python:
Copy code
def p_flow():
    flow_a = create_flow_run_from_deployment()
    flow_a_state = None

    while flow_a_state != "Completed":
           flow_a_state = task_to_get_flow_run_state(flow_a)
Not a working example but more of a conceptual idea, one of the benefits of 2.0 is that if it's accomplishable in native python there's likely a way to address it in 2.0
🙌 3
gratitude thank you 1