I'm running the following flow using `create_flow_...
# prefect-community
i
I'm running the following flow using
create_flow_run
to run a flow multiple times with different parameters. Here is my code and the error i'm getting.
k
You need to register the Flow because
create_flow_run
hits the GraphQL API and triggers a flow run backed by Cloud
Could you move the code and traceback to the thread to keep the main channel clean?
Oops sorry I meant to move the traceback but didnt copy it
i
Copy code
import prefect
from prefect import task, Flow, Parameter
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor

@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
    print(f"hello {user_input}!. You're {age_input} today")


with Flow("dummy-flow-to-be-run-multiple-times") as flow:
    user_param = Parameter("user_input", default="world")
    age_param = Parameter("age_input", default=18)
    hw = hello_world(user_param, age_param)


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect", age_input=21),
                  dict(user_input="Marvin", age_input=27),
                  dict(user_input="World", age_input=12),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
    )
k
Thank you!
i
which of the flows should i register?
k
The one called by
create_flow_run
Well both actually for production, but if you are using
flow.run()
then at least the dummy one.
i
I added a
flow.register()
after the first flow but I still get the same error
k
Could you show me the new code?
i
Copy code
@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
    print(f"hello {user_input}!. You're {age_input} today")


with Flow("dummy-flow-to-be-run-multiple-times") as flow:
    user_param = Parameter("user_input", default="world")
    age_param = Parameter("age_input", default=18)
    hw = hello_world(user_param, age_param)

flow.register()

with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect", age_input=21),
                  dict(user_input="Marvin", age_input=27),
                  dict(user_input="World", age_input=12),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
    )
k
This register call won’t work right? You need to supply a project name to register with Prefect Cloud.
i
how about when running locally?
k
You can’t use
create_flow_run
for local execution. The code underneath hits the Prefect Cloud API. Prefect 2.0 (Orion) will let you do this but this is a limitation in current Prefect
i
for the cloud, i now have
flow.register(project_name=PROJECT)
k
Yes exactly, and then now you can do the create_flow_run
i
Now I have this warning and it doesn't seem to build a docker image
Copy code
Collecting flows...
/Users/ifeanyi/Development/prefect-measurement-ml/containers/prefect_test/prefect_test/test_hello_world.py:28: UserWarning: Attempting to call `flow.register` during execution of flow file will lead to unexpected results.
 flow.register(project_name=PROJECT)
Processing 'prefect_test/test_hello_world.py':
 Building `Local` storage...
 Registering 'mapped_flows'... Done
 └── ID: 778f4497-bba4-4b50-b7eb-703d189b9085
 └── Version: 1
======================== 1 registered ========================
k
That warning is fine, but it’s cuz you have
flow.run()
in the same script. The default storage will not build a Docker image for you. You can check the Storage docs for more information. You want to be using Docker storage
i
I have storage specified now and I don't have
flow.run()
in my script
Copy code
PROJECT = "seller-ds-production"
# Docker storage is the most flexible for managing your dependencies
storage = Docker(
    dockerfile="Dockerfile",
    registry_url=f"<http://us.gcr.io/{PROJECT}|us.gcr.io/{PROJECT}>",
    image_name="test_prefect_cloud",
)

@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
    print(f"hello {user_input}!. You're {age_input} today")


with Flow("dummy-flow-to-be-run-multiple-times") as flow:
    user_param = Parameter("user_input", default="world")
    age_param = Parameter("age_input", default=18)
    hw = hello_world(user_param, age_param)

flow.register(project_name=PROJECT)

with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect", age_input=21),
                  dict(user_input="Marvin", age_input=27),
                  dict(user_input="World", age_input=12),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
    )
k
You have storage but it’s not attached to the Flow 😅 You can do
flow.storage = storage
or
Flow("…", storage=…)
i
I had this and it still didn't build the docker image
oh crap
I have to do this for both flows?
k
I think they need to be different images (for now), otherwise it will be overriden, There is a way to package them in the same container though
i
So I packaged them in the same container and it works. I run flows run successfully but I do not see the print statements in the logs.
I changed from print to prefect logger still no output in the logs
Never mind. Figured it out. Thanks