Hi team! I have a few files which contain some dif...
# prefect-community
s
Hi team! I have a few files which contain some different flows. There are some common functions that I want them all to share, all in a common.py file in the same directory. I could turn this into a github package and pip installable by itself, but is there a simple way of simply flagging to the storage that I also want common.py uploaded into my S3 bucket alongside the flow file?
j
Docker storage?
files={pathlib.Path(__file__).parent.absolute() / "tasks.py": "/src/tasks.py"}
- theres a files arg which lets you copy files into the container and I believe adds them to
PYTHONPATH
But that being said I really would like to have more control over the image...
s
Ill look into both of them, thanks mate! Docker storage would be the robust option for sure, Im just worried about having one image per flow - python images are generally not that small and transferring dozens of them per build will be slow.
j
A pattern that I generally recommend (where it makes sense) is to install all dependencies + custom utilities in a docker image, then share that image between all your flows. You can use an external storage mechanism (e.g.
GitHub
,
S3
) to host your flow source itself allowing for quick updating of the flow, you'd only need to update the docker image if the dependencies or common functions changed.
For example, if you were deploying on docker with the flow source hosted on github:
Copy code
flow.run_config = DockerRun(image="my-shared-image")
flow.storage = GitHub("my/repo", "flows/myflow.py")
Another option would be to use
Docker
storage, but use a base image with all your dependencies/common files present already. Then all your flows will share the layers in the base image and only add a layer holding the flow source itself (which should be quick, and won't take up that much space).
s
Ah interesting, that might be very handy. Out of curiosity, how does this work with dask? I was under the impression (having never used dask before) that something like a Docker agent that picks up a Docker run would still be executed on the dask workers and thus not care about the docker run image at all. But I think Ive misunderstood what run configs, executors and agents are for sure.
j
Have you read through https://docs.prefect.io/orchestration/flow_config/overview.html? Does that help at all?
If you're using a dask executor, you'd want to ensure the dask workers are running the same image as your flow. How this works depends on how you configure the backing dask cluster (usually it's just forwarding an
image
kwarg somewhere).
Your flow source itself doesn't need to be accessible from the workers, just the dependencies it uses.
s
Yeah I had a good 8 hour debug session a few days ago to get the say hello example running (turns out theres a UI bug in 0.14.6 so rolling back to 0.14.5 fixed it), and read through it all, but my brain has struggled. RIght, so anything inside a task needs to be on the dask image and doesnt need to be in the docker image? Even after rereading that overview Im still confused as to configurations vs exectors and where dependencies need to live
j
If it's confusing to you, it's likely confusing to others, so we should clean up that doc to be more useful
s
On second thought, given the import is in the main flow itself, Im guessing the file would have to be present on both the dark worker image (so the tasks can use it) and the agent that runs the flow itself (as its imported when running the flow). Would that be correct?
j
Under the hood, dask (and prefect) uses
cloudpickle
to distribute tasks around, which can distribute code defined in the main flow runner alone to the workers, without requiring the source be available. What this means for you as a user: • Usually you want all containers started for a flow run to run the same image. This simplifies debugging, and removes one-more-thing to think about. Not strictly required, but it's best practice. • If your task makes use of an external library (anything that you import into your script defining your flow), that library needs to be available in the image. • Any code written in the script defining your flow can be pulled and distributed by prefect's storage mechanism, and thus doesn't have to be part of the image.
Copy code
from myutils.stuff import a_helper  # myutils needs to be in the image

# This script itself doesn't need to be in the image

@task
def mytask(a):
    return a_helper(a)

with Flow("example") as flow:
    ...
On second thought, given the import is in the main flow itself, Im
guessing the file would have to be present on both the dark worker image
(so the tasks can use it) and the agent that runs the flow itself (as its imported when running the flow). Would that be correct?
Agents never import or run user code, the agent only needs access to
prefect
itself. When an agent receives a flow to run, it will start a new process (referred to as the flow-runner process) that will run the flow. This will run your image provided and load your flow from storage. If you're using a dask executor, this may also start (or connect) to other processes to distribute your flow run. In that case, those other processes should also use the same image.
Does that help?
s
I feel like my lack of familiarity with Dask is getting in the way. From my current understanding, if I have a DockerAgent, it will execute DockerRun configs, which means it will launch a process that starts the built image? The image runs the flow, which has tasks, and because I have a Dask executor, those tasks arent run in the docker container, but are instead passed to dask to schedule and execute using cloudpickle, so the Dask worker images need to have imports available and ready to go for any given task?
j
I think you're close. Repeating your example in more precise language: • Your docker agent receives a json blob describing a flow run • In it is a
DockerRun
run config describing details of the container to use for that flow run, and a
Storage
object describing where to load the flow source from • The docker agent kicks off a new docker container using the config in the
DockerRun
object (this may specify an image to use, etc...). • This container starts the flow runner process. It pulls the flow source from the provided
Storage
object and loads the flow. • The flow runner sees it has a
DaskExecutor
configured. By default this starts a local dask cluster (in the same container), parallelizing your flow across several processes. No extra config is needed here. If you passed in a
cluster_class
to create a temporary external dask cluster, you'll also need to pass in the image to use for ensuring the environments match. Likewise if you passed in an
address
to connect to an existing external cluster, you'll want to make sure the existing cluster has a compatible environment. Lots of flexibility here, which makes it hard to describe. See https://docs.prefect.io/orchestration/flow_config/executors.html#daskexecutor for more info. • The flow runner starts processing the flow. Tasks will be sent out to the dask workers (which run either in the same container, or in external containers depending on your config). Imports used by these tasks need to be available wherever the dask workers are running, but anything defined in the flow script file itself doesn't need to be.
❤️ 1
👍 1
🙌 4
s
Ah that helps! To help me figure out if I even need the Dask scheduler+workers Ive set up right now, can I ask what is the difference between DaskExecutor (with no external address) and LocalDaskExecutor? And a Docker RunConfig wont have any issues with the whole DockerInDocker thing?
Also, thanks for the detailed help so far, appreciate it!
s
FWIW, using the common base image for my org that has the boilerplate prefect code is exactly how i've set things up for now – great to see that i'm not the only one doing this @Jim Crist-Harif – we haven't fully productionized everything yet, but we're getting there, and we're going with
DockerRun
config and
S3
for storage; we're recommending that everyone who wants to use prefect build their runconfig image using the common boilerplate one i've created as their base image, so that all of the behind-the-scenes stuff that can (and should) be abstracted away from them will immediately be available
upvote 2
j
what is the difference between DaskExecutor (with no external address) and LocalDaskExecutor?
Dask supports a few different scheduling backends (the original local scheduler we wrote, and the later distributed backend which also can run locally).
LocalDaskExecutor
uses the original local scheduler,
DaskExecutor
uses the distributed backend. For prefect users, I'd hope that the description here (https://docs.prefect.io/orchestration/flow_config/executors.html#choosing-an-executor) is sufficient for choosing an executor, are there more details we should add here?
And a Docker RunConfig wont have any issues with the whole DockerInDocker thing?
The docker agent needs to run somewhere it can kick off docker containers (so if you're running it in a docker container itself you'd need a docker-in-docker setup). The prefect flow runs themselves don't know they're in docker, so no docker-in-docker issues there.
s
@Samuel Hinton did you get answers to your issue with launching your docker agent from within a container? we're not planning on doing that, but if you are, i do assume that the agent-running container will need to have the docker CLI installed in it – to avoid docker-in-docker you can mount the docker.sock into that agent container (see "The Socket Solution" section of this article on warnings about docker-in-docker)
upvote 1
s
Ah, so after reading that “Choosing an executor” again Im wondering if I should give LocalExecutor a try again. Essentially, the tasks we have often involve requesting data from some truly slow and unreliable APIs, and so I jumped to Dask because I wanted to not get caught up waiting. But I see that its parallelism within a flow that we care about, so if I have a dozen flows for different endpoints they should all be able to spin up locally, its just sequential within each flow. Re docker-in-docker, I think so. Essentially we have a docker swarm that deploys prefect+agents, and so the agents and everything else have to be defined in the same docker-compose file. Im wondering now - given the use case of a dozen or so flows every half hour - and the docker-in-docker concerns, whether I should use a docker agent at all or just try getting things working with a local agent (which is running off an image we build), and simply having a handful of local agents replicated, each of which is just running flows locally using local executors. Thoughts? This removes docker-in-docker concerns, but now dependencies are tied into the agent image itself rather than a separate docker image which the agents spin up.
j
I think getting the docker-in-docker setup working would be worth your time and simpler in the long run. I'm not sure what all is involved there, but I know people have done it before with prefect.
You might also make use of a
LocalDaskExecutor
for running all your tasks in a threadpool if you do have opportunities for parallelism within a flow. This works great for network heavy code, while being lighterweight than a full
DaskExecutor
. If you don't have opportunities for parallelism within a single flow though then a
LocalExecutor
makes sense.
s
Okay sounds like a plan. Ill see if I can get this running using a docker run config and hopefully report back in the “show us what you got” with a resounding success story haha
👍 1
j
I've opened an issue for docs on a docker-agent-in-docker setup. https://github.com/PrefectHQ/prefect/issues/4088 If you figure anything out, please comment there, otherwise we'll hopefully get to it soon.
s
Thanks mate!
k
Regarding clean up documentation, I would be really grateful for some example of setting up flow with different storage and agent+run than Local. @Samuel Hinton remembered from last week, that I am trying find out why my setup github storage + local agent/run is not working (aka, when I trigger run from UI, it is never executes, it is hanging. So it is submitted and than after while happening nothing, I cancelled it). There is now problem when running it from CLI on local machine.
s
It might be overkill, but I only managed to fix a similar issue I had with agents never executing tasks by completely wiping the images and entire postgres database and rebuilding everything from scratch.
🙈 1
j
Karolina, that sounds like an issue with flow label mismatch. You might try running your local agent with no labels (
prefect agent local start --no-hostname-label
), or add a label to your local run and the agent
Copy code
flow.run_config = LocalRun(labels=["my-local-label"])
and to start the agent
Copy code
prefect agent local start --label my-local-label
See https://docs.prefect.io/orchestration/flow_config/run_configs.html#labels for more info.
Your point about more examples is taken though, we're hoping to find some time soon to revamp our docs.
✔️ 1
k
Thanks @Jim Crist-Harif I have already played around labels before, I had to do some mistake as now this is not issue anymore. Thanks for your help!