Patrick Tan
04/01/2022, 12:00 PMwith Flow("parent-flow") as flow:
for i in range(2):
flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
print(i)
Anna Geller
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
mapped_flow_run_ids = create_flow_run.map(
flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
project_name=unmapped("your_project_name"),
)
idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S")
, this should work:
with Flow("parent-flow") as flow:
for i in range(2):
flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
Patrick Tan
04/01/2022, 1:11 PMKevin Kho
Patrick Tan
04/01/2022, 3:34 PMKevin Kho
create_flow_run.map()
and it’s analogous to a for-loop. You just need to supply unique idempotency keysAnna Geller
from prefect import task, Flow, Parameter
from prefect.executors import LocalExecutor
from datetime import timedelta
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def read_data_from_file(file_name: str):
return "some_data"
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_1(x):
# some cleaning
return x
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_2(x):
# some cleaning
return x
@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_15(x):
# some cleaning
return x
with Flow("clean all files", executor=LocalExecutor()) as flow:
all_files = Parameter(
"files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
)
data = read_data_from_file.map(all_files)
transformed_data = etl_1.map(data)
transformed_data = etl_2.map(transformed_data)
transformed_data = etl_15.map(transformed_data)
you could basically do the same but instead of etl_1.map(data)
, it would be create_flow_run.map(args)
Patrick Tan
04/01/2022, 6:26 PMKevin Kho
Patrick Tan
04/01/2022, 7:53 PMflow_id
and flow_name
are null. You must pass a flow "
ValueError: Both flow_id
and flow_name
are null. You must pass a flow identifier
[2022-04-01 155049-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Failed'
[2022-04-01 155049-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run...
[2022-04-01 155049-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[1]': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in create_flow_run
"Both flow_id
and flow_name
are null. You must pass a flow "
ValueError: Both flow_id
and flow_name
are null. You must pass a flow identifier
[2022-04-01 155049-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Failed'
[2022-04-01 155049-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-04-01 155049-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'TriggerFailed'
[2022-04-01 155050-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2022-04-01 155050-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed'
[2022-04-01 155050-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failemapped_flow_run_ids = create_flow_run.map(parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
"bucket":"wp-ahdata-sandbox",
"prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
"bucket":"wp-ahdata-sandbox",
"prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}])
Kevin Kho
create_flow_run
needs either the flow_id or flow_name and project_name. I don’t think you supply it?Patrick Tan
04/01/2022, 8:27 PMwith Flow("parent-flow") as flow:
#filelist = get_filelist_task()
mapped_flow_run_ids = create_flow_run.map(flow_name="livelots flow",
project_name="LiveLots-ETL",
parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
"bucket":"wp-ahdata-sandbox",
"prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
"bucket":"wp-ahdata-sandbox",
"prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
)
flow_a = create_flow_run(flow_id=mapped_flow_run_ids,
idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow.run()
/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/bin/python /Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/flows/schedule_flow.py /Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml wp-ahdata-sandbox dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv
[2022-04-01 161634-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2022-04-01 161634-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-04-01 161634-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
[2022-04-01 161634-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run...
[2022-04-01 161635-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[0]': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 130, in create_flow_run
flow = FlowView.from_flow_name(flow_name, project_name=project_name)
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 207, in from_flow_name
order_by={"created": EnumValue("desc")},
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 301, in _query_for_flows
f"No results found while querying for flows where {where!r}"
ValueError: No results found while querying for flows where {'name': {'_eq': 'l'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'L'}}}
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Failed'
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run...
[2022-04-01 161635-0400] ERROR - prefect.TaskRunner | Task 'create_flow_run[1]': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 130, in create_flow_run
flow = FlowView.from_flow_name(flow_name, project_name=project_name)
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 207, in from_flow_name
order_by={"created": EnumValue("desc")},
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow.py", line 301, in _query_for_flows
f"No results found while querying for flows where {where!r}"
ValueError: No results found while querying for flows where {'name': {'_eq': 'i'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'i'}}}
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Failed'
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'TriggerFailed'
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2022-04-01 161635-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed'
[2022-04-01 161635-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Process finished with exit code 0Kevin Kho
create_flow_run.map(flow_name="livelots flow",
project_name="LiveLots-ETL",
treats the strings as iterables so you end up mapping over the letters of the string.
Can you try with unmapped?
create_flow_run.map(flow_name=unmapped("livelots flow"),
project_name=("LiveLots-ETL"),
and using from prefect import unmapped
Patrick Tan
04/01/2022, 8:47 PMwith Flow("parent-flow") as flow:
#filelist = get_filelist_task()
mapped_flow_run_ids = create_flow_run.map(flow_name=unmapped("livelots flow"),
project_name=unmapped("LiveLots-ETL"),
idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),
parameters=[{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
"bucket":"wp-ahdata-sandbox",
"prefix":"dot-scrapy/live-lots/grays/test-live-lots-raw.tsv"},
{"config_file":"/Users/patricktan/bitbucket/data/live-lots-etl/live-lots-etl-batch/config-dev-test.yaml",
"bucket":"wp-ahdata-sandbox",
"prefix":"dot-scrapy/live-lots/Blackwell/20220325_180155_Blackwell-DS=420.tsv"}]
)
wait_for_flow_a = wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)
flow.run()
[2022-04-01 163942-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2022-04-01 163942-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-04-01 163942-0400] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
[2022-04-01 163942-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Starting task run...
[2022-04-01 163942-0400] INFO - prefect.create_flow_run[0] | Creating flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow' for flow 'livelots flow'...
[2022-04-01 163943-0400] INFO - prefect.create_flow_run[0] | Created flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow': https://cloud.prefect.io/patrick-tan-worthpoint-com-s-account/flow-run/b77f3d80-a2d1-4ac5-bb2c-4d571bbc6650
[2022-04-01 163943-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success'
[2022-04-01 163943-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Starting task run...
[2022-04-01 163943-0400] INFO - prefect.create_flow_run[1] | Creating flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow' for flow 'livelots flow'...
[2022-04-01 163943-0400] INFO - prefect.create_flow_run[1] | Created flow run '78d36f6c-53ba-4865-bf6a-8d88f2f6e16e-livelots flow': https://cloud.prefect.io/patrick-tan-worthpoint-com-s-account/flow-run/d47e6fa6-d333-46f9-b9a9-89774aef715b
[2022-04-01 163943-0400] INFO - prefect.TaskRunner | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success'
[2022-04-01 163943-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2022-04-01 163944-0400] ERROR - prefect.TaskRunner | Task 'wait_for_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 258, in wait_for_flow_run
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 566, in from_flow_run_id
flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 608, in _query_for_flow_run
result = client.graphql(flow_run_query)
File "/Users/patricktan/bitbucket/data/live-lots-etl/venv-livelotsetl/lib/python3.7/site-packages/prefect/client/client.py", line 473, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'parsing UUID failed, expected String, but encountered Array', 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
[2022-04-01 163944-0400] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'Failed'
[2022-04-01 163944-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.Kevin Kho
List[str]
Patrick Tan
04/01/2022, 8:49 PMKevin Kho
wait_for_flow_run(mapped_flow_run_ids, raise_final_state=True)
you need to map this too btw and use unmapped for the TruePatrick Tan
04/01/2022, 9:15 PM