Patrick Tan
08/09/2022, 7:42 PMBianca Hoch
08/09/2022, 8:06 PMPatrick Tan
08/09/2022, 8:15 PMflow_run_ids = create_flow_run(
flow_name=unmapped("LiveLots-ETL"),
project_name=unmapped(f"{prefect_project}"),
run_name=unmapped("Flow Test-LiveLots-ETL"),
idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),
parameters={"config_file": config_file, "bucket": bucket, "prefix": prefix},
)
wait_for_flow =
wait_for_flow_run(flow_run_ids, raise_final_state=unmapped(True))
Bianca Hoch
08/09/2022, 8:21 PMPatrick Tan
08/09/2022, 8:24 PMBianca Hoch
08/09/2022, 9:55 PMfrom prefect import flow
@flow(name = "Flow A")
def flow_A(params):
print(f"I am flow A, and I received these parameters: {params}")
@flow(name = "Flow B")
def flow_b():
print(f"I am flow B, I run after Flow A")
params_list = [{"config_file": "config_file", "bucket": "bucket", "prefix": "prefix"},
{"config_file": "config_file2", "bucket2": "bucket2", "prefix": "prefix2"},
{"config_file": "config_file3", "bucket2": "bucket3", "prefix": "prefix3"}]
@flow(name = "Parent Flow")
def parent_flow(params_list):
for params in params_list:
flow_A(params)
#Runs after Flow A is done
flow_b()
parent_flow(params_list)
Patrick Tan
08/10/2022, 3:21 PMMason Menges
08/10/2022, 9:35 PM@flow
def p_flow():
flow_a = create_flow_run_from_deployment()
flow_a_state = None
while flow_a_state != "Completed":
time.sleep(sometimeinseconds)
flow_a_state = task_to_get_flow_run_state(flow_a)
Not a working example but more of a conceptual idea, one of the benefits of 2.0 is that if it's accomplishable in native python there's likely a way to address it in 2.0