Patrick Tan

    Patrick Tan

    5 months ago
    Hi, Flow A repeatedly (loop) run Flow B. I want run next instance of Flow B after current Flow B is completed. I ran below and 2 instances of the same flow run at same time, and flow-id is same. see screenshot.
    with Flow("parent-flow") as flow:
    
        for i in range(2):
            flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
            wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
            print(i)
    Anna Geller

    Anna Geller

    5 months ago
    A couple of issues here:1. You can't run for loops in Prefect 1.0. This is possible in Prefect 2.0, but in 1.0, within the Flow, you can only construct your DAG, i.e., call your tasks - no for loops allowed. Technically your for-loop works because it iterates over a static list of values, but still, this is not something you should do in 1.0 - mapping would be a better approach. Check the example below 2. The print statement is also not allowed
    from prefect import Flow, unmapped
    from prefect.tasks.prefect import create_flow_run
    from prefect.executors import LocalDaskExecutor
    
    with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
        mapped_flow_run_ids = create_flow_run.map(
            flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
            project_name=unmapped("your_project_name"),
        )
    And actually, regarding your issue: it's most likely due to the idempotency key. If you add
    idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S")
    , this should work:
    with Flow("parent-flow") as flow:
    
        for i in range(2):
            flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
            wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
    Patrick Tan

    Patrick Tan

    5 months ago
    I added the code and now child flow only execute once, I expect it to run twice (for i in range(2))
    @Anna Geller, miss your 1st response, let me review first
    Kevin Kho

    Kevin Kho

    5 months ago
    Yeah this looks like an idempotency key issue. The docstring of the idempotency key in the task gives a good description of what is happening.
    Patrick Tan

    Patrick Tan

    5 months ago
    @Kevin Kho, I have a parent flow with a task that return a list of files, then for each file, I want to run a child flow by passing filename as parameter. I am scratching my head since task is not iterable, do you any idea or sample code ?
    Kevin Kho

    Kevin Kho

    5 months ago
    You can use
    create_flow_run.map()
    and it’s analogous to a for-loop. You just need to supply unique idempotency keys
    Anna Geller

    Anna Geller

    5 months ago
    from prefect import task, Flow, Parameter
    from prefect.executors import LocalExecutor
    from datetime import timedelta
    
    
    @task(max_retries=5, retry_delay=timedelta(minutes=10))
    def read_data_from_file(file_name: str):
        return "some_data"
    
    
    @task(max_retries=5, retry_delay=timedelta(minutes=10))
    def etl_1(x):
        # some cleaning
        return x
    
    
    @task(max_retries=5, retry_delay=timedelta(minutes=10))
    def etl_2(x):
        # some cleaning
        return x
    
    
    @task(max_retries=5, retry_delay=timedelta(minutes=10))
    def etl_15(x):
        # some cleaning
        return x
    
    
    with Flow("clean all files", executor=LocalExecutor()) as flow:
        all_files = Parameter(
            "files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
        )
        data = read_data_from_file.map(all_files)
        transformed_data = etl_1.map(data)
        transformed_data = etl_2.map(transformed_data)
        transformed_data = etl_15.map(transformed_data)
    you could basically do the same but instead of
    etl_1.map(data)
    , it would be
    create_flow_run.map(args)
    Patrick Tan

    Patrick Tan

    5 months ago
    @Anna Geller. Can you tell me what's wrong with my code? I did create_flow_run.map for all the files, which will be passed as parameters, then create_flow_run. Sorry I am new on this: with Flow("parent-flow") as flow: mapped_flow_run_ids = create_flow_run.map(parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml", "bucket":"wp-ahdata-sandbox", "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"}, {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml", "bucket":"wp-ahdata-sandbox", "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]) flow_a = create_flow_run(flow_id=mapped_flow_run_ids, flow_name="livelots flow", project_name="LiveLots-ETL", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S")) wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True) flow.run()
    Kevin Kho

    Kevin Kho

    5 months ago
    What is the error here? Still only one waits created?
    Patrick Tan

    Patrick Tan

    5 months ago
    This is the error: [2022-04-01 15:50:49-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run... [2022-04-01 15:50:49-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[0]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in create_flow_run "Both
    flow_id
    and
    flow_name
    are null. You must pass a flow " ValueError: Both
    flow_id
    and
    flow_name
    are null. You must pass a flow identifier [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Failed' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run... [2022-04-01 15:50:49-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[1]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in create_flow_run "Both
    flow_id
    and
    flow_name
    are null. You must pass a flow " ValueError: Both
    flow_id
    and
    flow_name
    are null. You must pass a flow identifier [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Failed' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 15:50:50-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run... [2022-04-01 15:50:50-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 15:50:50-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks faile
    When running child flow, flow name and flow project are same, only parameters are different. So I only map parameters:
    mapped_flow_run_ids = create_flow_run.map(parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                          {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}])
    Kevin Kho

    Kevin Kho

    5 months ago
    create_flow_run
    needs either the flow_id or flow_name and project_name. I don’t think you supply it?
    Patrick Tan

    Patrick Tan

    5 months ago
    I provides create_flow_run.map with flow_name, flow_project and parameter (parameter is list of 2 dictionaries. Then I follow by create_flow_run with flow_id from create_flow_run.map and idempotency_key. The error shows failure to run child flow twice
    with Flow("parent-flow") as flow:
        #filelist = get_filelist_task()
    
        mapped_flow_run_ids = create_flow_run.map(flow_name="livelots flow",
                                                  project_name="LiveLots-ETL",
                                                  parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                               "bucket":"wp-ahdata-sandbox",
                                                               "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                              {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                               "bucket":"wp-ahdata-sandbox",
                                                               "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
                                                  )
    
        flow_a = create_flow_run(flow_id=mapped_flow_run_ids,
                                idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
    
    
        wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
    
        flow.run()
    /Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/bin/python /Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/flows/schedule_flow.py /Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml wp-ahdata-sandbox dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv [2022-04-01 16:16:34-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow' [2022-04-01 16:16:34-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 16:16:34-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped' [2022-04-01 16:16:34-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run... [2022-04-01 16:16:35-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[0]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 130, in create_flow_run flow = FlowView.from_flow_name(flow_name, project_name=project_name) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 207, in from_flow_name order_by={"created": EnumValue("desc")}, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 301, in _query_for_flows f"No results found while querying for flows where {where!r}" ValueError: No results found while querying for flows where {'name': {'_eq': 'l'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'L'}}} [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Failed' [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run... [2022-04-01 16:16:35-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[1]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 130, in create_flow_run flow = FlowView.from_flow_name(flow_name, project_name=project_name) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 207, in from_flow_name order_by={"created": EnumValue("desc")}, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 301, in _query_for_flows f"No results found while querying for flows where {where!r}" ValueError: No results found while querying for flows where {'name': {'_eq': 'i'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'i'}}} [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Failed' [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run... [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 16:16:35-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed. Process finished with exit code 0
    Kevin Kho

    Kevin Kho

    5 months ago
    This looks off because
    create_flow_run.map(flow_name="livelots flow",
                        project_name="LiveLots-ETL",
    treats the strings as iterables so you end up mapping over the letters of the string. Can you try with unmapped?
    create_flow_run.map(flow_name=unmapped("livelots flow"),
                        project_name=("LiveLots-ETL"),
    and using
    from prefect import unmapped
    Patrick Tan

    Patrick Tan

    5 months ago
    There is some success with using unmapped function. The child flow ran twice with 2 set of parameters (expected). However, they are run at same time with same flow-id, I expected running one at a time. and also get some error message
    with Flow("parent-flow") as flow:
        #filelist = get_filelist_task()
    
        mapped_flow_run_ids = create_flow_run.map(flow_name=unmapped("livelots flow"),
                                                  project_name=unmapped("LiveLots-ETL"),
                                                  idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),
                                                  parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                               "bucket":"wp-ahdata-sandbox",
                                                               "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                              {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                               "bucket":"wp-ahdata-sandbox",
                                                               "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
                                                  )
    
    
        wait_for_flow_a = wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)
    
        flow.run()
    [2022-04-01 16:39:42-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow' [2022-04-01 16:39:42-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 16:39:42-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped' [2022-04-01 16:39:42-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run... [2022-04-01 16:39:42-0400] INFO - prefect.create_flow_run[0] | Creating flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow' for flow 'livelots flow'... [2022-04-01 16:39:43-0400] INFO - prefect.create_flow_run[0] | Created flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow': https://cloud.prefect.io/patrick-tan-worthpoint-com-s-account/flow-run/b77f3d80-a2d1-4ac5-bb2c-4d571bbc6650 [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success' [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run... [2022-04-01 16:39:43-0400] INFO - prefect.create_flow_run[1] | Creating flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow' for flow 'livelots flow'... [2022-04-01 16:39:43-0400] INFO - prefect.create_flow_run[1] | Created flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow': https://cloud.prefect.io/patrick-tan-worthpoint-com-s-account/flow-run/d47e6fa6-d333-46f9-b9a9-89774aef715b [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success' [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run... [2022-04-01 16:39:44-0400] ERROR - prefect.TaskRunner | Task 'wait_for_flow_run': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 258, in wait_for_flow_run flow_run = FlowRunView.from_flow_run_id(flow_run_id) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 566, in from_flow_run_id flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}}) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 608, in _query_for_flow_run result = client.graphql(flow_run_query) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/client/client.py", line 473, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'parsing UUID failed, expected String, but encountered Array', 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}] [2022-04-01 16:39:44-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'Failed' [2022-04-01 16:39:44-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    Kevin Kho

    Kevin Kho

    5 months ago
    Looking again, I think the idempotency key is weird also because it’s a string so you might need to supply something of type
    List[str]
    Patrick Tan

    Patrick Tan

    5 months ago
    so the number of idempotency_key should be same as my parameters
    Kevin Kho

    Kevin Kho

    5 months ago
    yes for sure and they need to be distinct
    wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)
    you need to map this too btw and use unmapped for the True
    Because that task just takes in 1 id
    Patrick Tan

    Patrick Tan

    5 months ago
    Thanks for all your help, not quite done but making progress. Will reach out next week if there is still issue. Have a good weekend