I'm able to run a flow on Coiled, but only if I ru...
# ask-community
m
I'm able to run a flow on Coiled, but only if I run it with
flow.run(executor=ex)
as opposed to
create_flow_run
on a prefect client. When doing the latter, I get no apparent attempt to start the cluster, and I see task output on my docker agent (which, interestingly, is full of s3 permissions related crashes that don't happen if I don't specify my coiled/dask executor). When running with
flow.run
obviously the flow doesn't appear in my dashboard. What's the expected behavior? Am I supposed to have some other kind of agent running?
k
Hey @Michael Warnock, yes it is expected behavior that
flow.run()
does not appear on the dashboard. This is only for local testing. When you’re ready for production, you register the flow. The
client.create_flow_run
takes in a flow id to start so you would need to register first before you can use it. After registration, you can start a flow by clicking the “Quick Run” button in the UI, that will attempt to pass the flow to an agent. In your case, you want to spin up the local agent that would execute the Flow. Through the CLI it would be
prefect agent local start
. This agent will pick up and execute the flow runs. It will also pick up the scheduled flow runs. Just make sure agent labels match the flow labels.
m
I've registered and run flows that way before- my problem is with trying to use the DaskExecutor configured for Coiled. I assigned it to .executor in my flow before registering, and I don't see anywhere else to specify it (register or create_flow_run seem like the likely places). What kind of run_config should I use (I'm using DockerRun)?
k
You don’t need to specify it elsewhere. When the Flow runs, it will spin up the executor. For the RunConfig, if there is a specific container you need to run the file on top of then DockerRun is good. You would need a docker agent to run the flow.
m
Well, as I said, when I run it with
create_flow_run
I see no attempt to spin up or connect to the dask cluster (which log would that appear in? I also see no indication on the coiled dashboard that a cluster has been started, or any new ec2 instances), and the docker-agent logs are full of errors related to s3 permissions that I don't get if the executor isn't specified.
those s3 permissions errors also don't occur in the dask workers when I use
flow.run
- it actually works, modulo a thread safety issue I'm in the process of fixing
k
Are you using S3 Storage?
m
docker
s3 is for the inputs and outputs of the task
k
The spinning up and the connection to the Dask cluster would appear in the Prefect UI under the Flow Logs. The executor shouldn’t be related to S3 permissions, do you have the necessary env variables in the Docker container for AWS access?
m
yes- it works both under docker without the dask executor, and within the coil 'software_environment' which specifies a docker image to start from that's almost identical (uses the same dockerfile) when I use flow.run - it's only when using create_flow_run [with the dask/coil executor] that it happens, and it's not a simple credentials-not-found error- it's something about not having permission for ObjectHead - I don't have the error handy right now.
k
The
flow.run()
will not spin up the container on local to run the flow so I think the environment variables that you have on your local machine are providing that authentication, and those are not in the container with is why the errors are happening.
m
no. The same calls to s3 work inside the docker image that prefect builds if I do a
create_flow_task
without the dask-executor. If I fail to provide the credentials, I get an error along the lines of "credentials not found". If I do a
flow.run(executor=my-dask-executor)
an image is built by coiled, and run on the cluster it spins up. These tasks access s3 just fine. Something other than my credentials being there is wrong.
z
Hey Michael, we'll need some error logs to determine what's going on here. Does this work if you disable writing task results to S3?
m
ok- I passed credentials through the docker agent command line, which is running locally. I don't understand why it would be necessary, unless the tasks are running locally, and they certainly appear to be, though I have some activity on the dask cluster (which could easily be a not-cleanly-killed job I just had running)
yeah- there's no question the whole flow is running locally now (without the s3 error because of the credentials-to-docker-agent thing). It's ignoring the executor when I use create_flow_task. Do I maybe not want the DockerRun run_config? Trying that.
nope- still running locally.
z
How did you set the executor for your flow?
m
flow.executor = my_exec
before registering it
z
And how did you register your flow? Did you set the executor before the register call?
We don't persist executor settings to the database so it'll need to be in the pickled flow object. Are you using
stored_as_script=True
with your Docker storage?
(Sorry slack displayed your 'before registering it' message after I sent mine 🤷)
m
yeah, yours all just came in a lump; netsplit 🙂
Copy code
flow = ford.flow

    executor = ford.get_coiled_executor(image_uri=image_uri, region_name=worker_config.region_name)
    flow.executor = executor

    #flow.run_config = ECSRun(image=docker_image)
    #flow.run_config = DockerRun()#image=docker_image)

    flow_id = flow.register(project_name='feature-generator')

    prefect_client = Client()
    prefect_client.create_flow_run(
        flow_id=flow_id,
        parameters=dict(job_spec=job_spec),
    )
    #flow.run(executor=executor, parameters=dict(job_spec=job_spec))
Copy code
flow.storage = Docker(
    dockerfile="./Dockerfile",
    prefect_directory="/usr/src/app",
    stored_as_script=True,
    path="/usr/src/app/feature_generator/ford.py"
)
z
Aha 😄
You are storing the flow is a script, so when your flow runs we are executing
/usr/src/app/feature_generator/ford.py
then extracting the flow from the variables in the file. Since you are setting the executor in a different file, the executor is never set on your flow run.
m
oooohh 🤦
z
It's tricky that some things are persisted to the backend when you call
flow.register
and some aren't. Executors are not persisted to allow more customizable options (ie we don't have to know how to serialize/deserialize it)
m
ok, so, I don't know the docker image uri to pass to coiled, until I'm ready to run the flow. how do I break out of this chicken and egg?
z
Perhaps something like
Copy code
executor = ford.get_coiled_executor(image_uri=os.environ.get("IMAGE_URI"), region_name=worker_config.region_name)
    flow.executor = executor
I guess personally, I'd use S3 storage for your flow then just build your docker image yourself though
m
how does using s3 storage for the flow help, if the executor isn't serialized?
and how do I populate the environment of the agent(?) when I only get the image-uri as part of the CI process that's starting the flow?
Sorry if I'm being dense; I haven't quite grokked the prefect architecture yet. Anyway, I need to run; will be back in an hour or so.
z
If you use S3 storage then you don't have a chicken/egg issue, the image_uri is just from your prebuilt image. Yeah setting the env var would be tricky. You can set them per run, but I don't see a clear way to do it in an automated fashion. One cool thing you could do is use the prefect kv-store and set a key on registration to the image URI then pull the key when your flow is loaded from storage (it would have to be script-based storage so it executes).
m
Sorry- I still don't get it. If I use S3 storage, the flow script/blob gets put there when I register, but without the executor; then I create_flow_run, and an agent grabs it and executes it (without the executor, or with the chicken and egg problem). What am I missing? Non-docker storage is a problem for other reasons, or I'd just try it.
z
I'm imaging something like this
-> CI builds a docker image and pushes it to a predetermined URI -> Downstream CI task registers your flow -> Your flow script sets the executor to your predetermined URI (or you retrieve this from your earlier push step) -> Your flow script is pushed to S3 -> You create a flow run, it pulls your flow from S3, it has the executor set correctly
m
what code makes step 3 happen?
z
Just set the executor in the same file that your flow is defined in
m
you're describing the chicken-egg scenario then; how does it being on s3 help?
z
Because the docker image is not built at registration time
It's built before by you
m
that's still the chicken-egg scenario. I have the uri at registration time, but it's associated with that parameterized-flow; it's created by the CI pipeline which then executes the script that registers and runs the flow. I can't add to to the flow code before it's registered, and if my flow script can be pushed to s3 by some other step, I don't see what it is (manual copying?)
the flow's tasks depend on the version of the code that's embedded in the docker image by the CI
z
Prefect is building your dockerfile into a image right now at registration time; this gives you a URI, yes?
m
no- coiled is building a docker image, starting with the image the CI builds and puts in ECR, which I pass in the executor config
well, I suppose prefect is probably building a docker image too, based on the local dockerfile- this is what's so confusing, but I'm not using a uri from that
z
Okay. Why can it not work like this? • Build an image from your Dockerfile with your flow's requirements • Build another image derived from above using coiled • Push image to ECR; set URI in environment variable • In your flow script, set the executor to use the URI from the environment • Register your flow, use pickle-based S3 storage ◦ Your flow object will be frozen to S3 with the correctly configured executor
If you want to use script based storage • In your flow script, set the executor loading a URI from the environment • When you register your flow, attach a
RunConfig
with the environment variable set to the URI from CI
I'd recommend the second, as pickle-based storage is often very confusing.
m
ok- I still have no earthly idea why I would use s3 storage, but I guess the answer I was actually looking for is that I can set the environment variable for the agent in the RunConfig
z
If we are building the image for you at registration time (aka you are using Docker storage), you will not be able to set the value in the default run config for the flow. Building the image yourself and using something else to store your flow (literally any other storage) ensures that you an image URI at registration time that you can set in a run config. But 🤷 you can also separate the
build()
/
register(build=False)
steps if you're married to Docker storage.
m
what does "default run config for the flow" mean exactly? If I pass env={} to DockerRun in the code I pasted above, will the flow script the agent runs not see those variables?
z
If you set the
DockerRun
after you register the flow, it will not be attached to the flow
m
see the code above; I call register afterward
z
Then how are you going to have an image based on the one Prefect builds?
Very confused about this chicken/egg thing but it sounds like you've got it
m
I'm not- I use the one my CI builds to pass to coiled as the base image for what they will build: that part works- it's only the agent that has this problem, because the executor didn't get there
so I've moved the executor to the other file as you described, and I just need to have it pull from the environment now, and I think it should work- I'll let you know in a minute
z
I see; there's a third docker image in the mix 😄
m
I had to make coiled credentials available and call some coiled stuff only on the agent, but it's working. Thanks for the help!