https://prefect.io logo
Title
p

Patrick Tan

04/01/2022, 12:00 PM
Hi, Flow A repeatedly (loop) run Flow B. I want run next instance of Flow B after current Flow B is completed. I ran below and 2 instances of the same flow run at same time, and flow-id is same. see screenshot.
with Flow("parent-flow") as flow:

    for i in range(2):
        flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
        wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
        print(i)
a

Anna Geller

04/01/2022, 12:41 PM
A couple of issues here: 1. You can't run for loops in Prefect 1.0. This is possible in Prefect 2.0, but in 1.0, within the Flow, you can only construct your DAG, i.e., call your tasks - no for loops allowed. Technically your for-loop works because it iterates over a static list of values, but still, this is not something you should do in 1.0 - mapping would be a better approach. Check the example below 2. The print statement is also not allowed
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor

with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
        project_name=unmapped("your_project_name"),
    )
And actually, regarding your issue: it's most likely due to the idempotency key. If you add
idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S")
, this should work:
with Flow("parent-flow") as flow:

    for i in range(2):
        flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
        wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
p

Patrick Tan

04/01/2022, 1:11 PM
I added the code and now child flow only execute once, I expect it to run twice (for i in range(2))
@Anna Geller, miss your 1st response, let me review first
k

Kevin Kho

04/01/2022, 1:50 PM
Yeah this looks like an idempotency key issue. The docstring of the idempotency key in the task gives a good description of what is happening.
p

Patrick Tan

04/01/2022, 3:34 PM
@Kevin Kho, I have a parent flow with a task that return a list of files, then for each file, I want to run a child flow by passing filename as parameter. I am scratching my head since task is not iterable, do you any idea or sample code ?
k

Kevin Kho

04/01/2022, 3:36 PM
You can use
create_flow_run.map()
and it’s analogous to a for-loop. You just need to supply unique idempotency keys
a

Anna Geller

04/01/2022, 3:47 PM
from prefect import task, Flow, Parameter
from prefect.executors import LocalExecutor
from datetime import timedelta


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def read_data_from_file(file_name: str):
    return "some_data"


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_1(x):
    # some cleaning
    return x


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_2(x):
    # some cleaning
    return x


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_15(x):
    # some cleaning
    return x


with Flow("clean all files", executor=LocalExecutor()) as flow:
    all_files = Parameter(
        "files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
    )
    data = read_data_from_file.map(all_files)
    transformed_data = etl_1.map(data)
    transformed_data = etl_2.map(transformed_data)
    transformed_data = etl_15.map(transformed_data)
you could basically do the same but instead of
etl_1.map(data)
, it would be
create_flow_run.map(args)
p

Patrick Tan

04/01/2022, 6:26 PM
@Anna Geller. Can you tell me what's wrong with my code? I did create_flow_run.map for all the files, which will be passed as parameters, then create_flow_run. Sorry I am new on this: with Flow("parent-flow") as flow: mapped_flow_run_ids = create_flow_run.map(parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml", "bucket":"wp-ahdata-sandbox", "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"}, {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml", "bucket":"wp-ahdata-sandbox", "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]) flow_a = create_flow_run(flow_id=mapped_flow_run_ids, flow_name="livelots flow", project_name="LiveLots-ETL", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S")) wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True) flow.run()
k

Kevin Kho

04/01/2022, 6:31 PM
What is the error here? Still only one waits created?
p

Patrick Tan

04/01/2022, 7:53 PM
This is the error: [2022-04-01 15:50:49-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run... [2022-04-01 15:50:49-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[0]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in create_flow_run "Both
flow_id
and
flow_name
are null. You must pass a flow " ValueError: Both
flow_id
and
flow_name
are null. You must pass a flow identifier [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Failed' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run... [2022-04-01 15:50:49-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[1]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in create_flow_run "Both
flow_id
and
flow_name
are null. You must pass a flow " ValueError: Both
flow_id
and
flow_name
are null. You must pass a flow identifier [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Failed' [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 15:50:49-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 15:50:50-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run... [2022-04-01 15:50:50-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 15:50:50-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks faile
When running child flow, flow name and flow project are same, only parameters are different. So I only map parameters:
mapped_flow_run_ids = create_flow_run.map(parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                       "bucket":"wp-ahdata-sandbox",
                                                       "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                      {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                       "bucket":"wp-ahdata-sandbox",
                                                       "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}])
k

Kevin Kho

04/01/2022, 8:06 PM
create_flow_run
needs either the flow_id or flow_name and project_name. I don’t think you supply it?
p

Patrick Tan

04/01/2022, 8:27 PM
I provides create_flow_run.map with flow_name, flow_project and parameter (parameter is list of 2 dictionaries. Then I follow by create_flow_run with flow_id from create_flow_run.map and idempotency_key. The error shows failure to run child flow twice
with Flow("parent-flow") as flow:
    #filelist = get_filelist_task()

    mapped_flow_run_ids = create_flow_run.map(flow_name="livelots flow",
                                              project_name="LiveLots-ETL",
                                              parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                          {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
                                              )

    flow_a = create_flow_run(flow_id=mapped_flow_run_ids,
                            idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))


    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)

    flow.run()
/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/bin/python /Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/flows/schedule_flow.py /Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml wp-ahdata-sandbox dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv [2022-04-01 16:16:34-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow' [2022-04-01 16:16:34-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 16:16:34-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped' [2022-04-01 16:16:34-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run... [2022-04-01 16:16:35-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[0]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 130, in create_flow_run flow = FlowView.from_flow_name(flow_name, project_name=project_name) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 207, in from_flow_name order_by={"created": EnumValue("desc")}, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 301, in _query_for_flows f"No results found while querying for flows where {where!r}" ValueError: No results found while querying for flows where {'name': {'_eq': 'l'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'L'}}} [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Failed' [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run... [2022-04-01 16:16:35-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[1]': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 130, in create_flow_run flow = FlowView.from_flow_name(flow_name, project_name=project_name) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 207, in from_flow_name order_by={"created": EnumValue("desc")}, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 301, in _query_for_flows f"No results found while querying for flows where {where!r}" ValueError: No results found while querying for flows where {'name': {'_eq': 'i'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'i'}}} [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Failed' [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run... [2022-04-01 16:16:35-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed' [2022-04-01 16:16:35-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed. Process finished with exit code 0
k

Kevin Kho

04/01/2022, 8:30 PM
This looks off because
create_flow_run.map(flow_name="livelots flow",
                    project_name="LiveLots-ETL",
treats the strings as iterables so you end up mapping over the letters of the string. Can you try with unmapped?
create_flow_run.map(flow_name=unmapped("livelots flow"),
                    project_name=("LiveLots-ETL"),
and using
from prefect import unmapped
p

Patrick Tan

04/01/2022, 8:47 PM
There is some success with using unmapped function. The child flow ran twice with 2 set of parameters (expected). However, they are run at same time with same flow-id, I expected running one at a time. and also get some error message
with Flow("parent-flow") as flow:
    #filelist = get_filelist_task()

    mapped_flow_run_ids = create_flow_run.map(flow_name=unmapped("livelots flow"),
                                              project_name=unmapped("LiveLots-ETL"),
                                              idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),
                                              parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
                                                          {"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
                                                           "bucket":"wp-ahdata-sandbox",
                                                           "prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
                                              )


    wait_for_flow_a = wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)

    flow.run()
[2022-04-01 16:39:42-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow' [2022-04-01 16:39:42-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run... [2022-04-01 16:39:42-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped' [2022-04-01 16:39:42-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run... [2022-04-01 16:39:42-0400] INFO - prefect.create_flow_run[0] | Creating flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow' for flow 'livelots flow'... [2022-04-01 16:39:43-0400] INFO - prefect.create_flow_run[0] | Created flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow': https://cloud.prefect.io/patrick-tan-worthpoint-com-s-account/flow-run/b77f3d80-a2d1-4ac5-bb2c-4d571bbc6650 [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success' [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run... [2022-04-01 16:39:43-0400] INFO - prefect.create_flow_run[1] | Creating flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow' for flow 'livelots flow'... [2022-04-01 16:39:43-0400] INFO - prefect.create_flow_run[1] | Created flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow': https://cloud.prefect.io/patrick-tan-worthpoint-com-s-account/flow-run/d47e6fa6-d333-46f9-b9a9-89774aef715b [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success' [2022-04-01 16:39:43-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run... [2022-04-01 16:39:44-0400] ERROR - prefect.TaskRunner | Task 'wait_for_flow_run': Exception encountered during task execution! Traceback (most recent call last): File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state logger=self.logger, File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 258, in wait_for_flow_run flow_run = FlowRunView.from_flow_run_id(flow_run_id) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 566, in from_flow_run_id flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}}) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 608, in _query_for_flow_run result = client.graphql(flow_run_query) File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/client/client.py", line 473, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'parsing UUID failed, expected String, but encountered Array', 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}] [2022-04-01 16:39:44-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'Failed' [2022-04-01 16:39:44-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
k

Kevin Kho

04/01/2022, 8:48 PM
Looking again, I think the idempotency key is weird also because it’s a string so you might need to supply something of type
List[str]
p

Patrick Tan

04/01/2022, 8:49 PM
so the number of idempotency_key should be same as my parameters
k

Kevin Kho

04/01/2022, 8:49 PM
yes for sure and they need to be distinct
wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)
you need to map this too btw and use unmapped for the True
Because that task just takes in 1 id
p

Patrick Tan

04/01/2022, 9:15 PM
Thanks for all your help, not quite done but making progress. Will reach out next week if there is still issue. Have a good weekend
👍 1