Ifeanyi Okwuchi
02/08/2022, 10:39 PMcreate_flow_run
to run a flow multiple times with different parameters. Here is my code and the error i'm getting.Kevin Kho
create_flow_run
hits the GraphQL API and triggers a flow run backed by CloudIfeanyi Okwuchi
02/08/2022, 11:16 PMimport prefect
from prefect import task, Flow, Parameter
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
print(f"hello {user_input}!. You're {age_input} today")
with Flow("dummy-flow-to-be-run-multiple-times") as flow:
user_param = Parameter("user_input", default="world")
age_param = Parameter("age_input", default=18)
hw = hello_world(user_param, age_param)
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [dict(user_input="Prefect", age_input=21),
dict(user_input="Marvin", age_input=27),
dict(user_input="World", age_input=12),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
)
Kevin Kho
Ifeanyi Okwuchi
02/08/2022, 11:17 PMKevin Kho
create_flow_run
flow.run()
then at least the dummy one.Ifeanyi Okwuchi
02/08/2022, 11:20 PMflow.register()
after the first flow but I still get the same errorKevin Kho
Ifeanyi Okwuchi
02/08/2022, 11:22 PM@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
print(f"hello {user_input}!. You're {age_input} today")
with Flow("dummy-flow-to-be-run-multiple-times") as flow:
user_param = Parameter("user_input", default="world")
age_param = Parameter("age_input", default=18)
hw = hello_world(user_param, age_param)
flow.register()
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [dict(user_input="Prefect", age_input=21),
dict(user_input="Marvin", age_input=27),
dict(user_input="World", age_input=12),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
)
Kevin Kho
Ifeanyi Okwuchi
02/08/2022, 11:24 PMKevin Kho
create_flow_run
for local execution. The code underneath hits the Prefect Cloud API. Prefect 2.0 (Orion) will let you do this but this is a limitation in current PrefectIfeanyi Okwuchi
02/08/2022, 11:26 PMflow.register(project_name=PROJECT)
Kevin Kho
Ifeanyi Okwuchi
02/08/2022, 11:37 PMCollecting flows...
/Users/ifeanyi/Development/prefect-measurement-ml/containers/prefect_test/prefect_test/test_hello_world.py:28: UserWarning: Attempting to call `flow.register` during execution of flow file will lead to unexpected results.
flow.register(project_name=PROJECT)
Processing 'prefect_test/test_hello_world.py':
Building `Local` storage...
Registering 'mapped_flows'... Done
└── ID: 778f4497-bba4-4b50-b7eb-703d189b9085
└── Version: 1
======================== 1 registered ========================
Kevin Kho
flow.run()
in the same script. The default storage will not build a Docker image for you. You can check the Storage docs for more information. You want to be using Docker storageIfeanyi Okwuchi
02/08/2022, 11:42 PMflow.run()
in my script
PROJECT = "seller-ds-production"
# Docker storage is the most flexible for managing your dependencies
storage = Docker(
dockerfile="Dockerfile",
registry_url=f"<http://us.gcr.io/{PROJECT}|us.gcr.io/{PROJECT}>",
image_name="test_prefect_cloud",
)
@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
print(f"hello {user_input}!. You're {age_input} today")
with Flow("dummy-flow-to-be-run-multiple-times") as flow:
user_param = Parameter("user_input", default="world")
age_param = Parameter("age_input", default=18)
hw = hello_world(user_param, age_param)
flow.register(project_name=PROJECT)
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [dict(user_input="Prefect", age_input=21),
dict(user_input="Marvin", age_input=27),
dict(user_input="World", age_input=12),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
)
Kevin Kho
flow.storage = storage
or Flow("…", storage=…)
Ifeanyi Okwuchi
02/08/2022, 11:44 PMKevin Kho
Ifeanyi Okwuchi
02/09/2022, 12:17 AM