Patrick Tan
04/01/2022, 12:00 PMwith Flow("parent-flow") as flow:
    for i in range(2):
        flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
        wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
        print(i)Anna Geller
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
        project_name=unmapped("your_project_name"),
    )Anna Geller
idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S")with Flow("parent-flow") as flow:
    for i in range(2):
        flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
        wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)Patrick Tan
04/01/2022, 1:11 PMPatrick Tan
04/01/2022, 1:14 PMKevin Kho
Patrick Tan
04/01/2022, 3:34 PMKevin Kho
create_flow_run.map()Anna Geller
Anna Geller
from prefect import task, Flow, Parameter
from prefect.executors import LocalExecutor
from datetime import timedelta
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def read_data_from_file(file_name: str):
    return "some_data"
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_1(x):
    # some cleaning
    return x
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_2(x):
    # some cleaning
    return x
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_15(x):
    # some cleaning
    return x
with Flow("clean all files", executor=LocalExecutor()) as flow:
    all_files = Parameter(
        "files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
    )
    data = read_data_from_file.map(all_files)
    transformed_data = etl_1.map(data)
    transformed_data = etl_2.map(transformed_data)
    transformed_data = etl_15.map(transformed_data)etl_1.map(data)create_flow_run.map(args)Patrick Tan
04/01/2022, 6:26 PMKevin Kho
Patrick Tan
04/01/2022, 7:53 PMflow_idflow_nameflow_idflow_nameflow_idflow_nameflow_idflow_namePatrick Tan
04/01/2022, 8:00 PMmapped_flow_run_ids = create_flow_run.map(parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                       "bucket":"wp-ahdata-sandbox",
                                                       "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                      {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                       "bucket":"wp-ahdata-sandbox",
                                                       "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}])Kevin Kho
create_flow_runPatrick Tan
04/01/2022, 8:27 PMwith Flow("parent-flow") as flow:
    #filelist = get_filelist_task()
    mapped_flow_run_ids = create_flow_run.map(flow_name="livelots flow",
                                              project_name="LiveLots-ETL",
                                              parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                          {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
                                              )
    flow_a = create_flow_run(flow_id=mapped_flow_run_ids,
                            idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
    flow.run()Kevin Kho
create_flow_run.map(flow_name="livelots flow",
                    project_name="LiveLots-ETL",create_flow_run.map(flow_name=unmapped("livelots flow"),
                    project_name=("LiveLots-ETL"),from prefect import unmappedPatrick Tan
04/01/2022, 8:47 PMwith Flow("parent-flow") as flow:
    #filelist = get_filelist_task()
    mapped_flow_run_ids = create_flow_run.map(flow_name=unmapped("livelots flow"),
                                              project_name=unmapped("LiveLots-ETL"),
                                              idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),
                                              parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                          {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
                                              )
    wait_for_flow_a = wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)
    flow.run()Kevin Kho
List[str]Patrick Tan
04/01/2022, 8:49 PMKevin Kho
Kevin Kho
wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)Kevin Kho
Patrick Tan
04/01/2022, 9:15 PM