Is it possible to extend the prefect Flow with my ...
# prefect-server
g
Is it possible to extend the prefect Flow with my own flow class that implements some common patterns? If I do that, and I use my own flow class in the context manager during a flow creation on the client, which downstream services need that flow class code? I am seeing this in my prefect server ui:
Copy code
Last State Message
[9:35am]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'rightsize'")
my flow class is called:
class RightsizeFlow(Flow):
Does my container running the prefect server need to have my Rightsize code installed in some way?
j
I recommend not subclassing if you can avoid it (prefect isn't designed for this use case). That said, the flow code should only ever be loaded in the flow runner, prefect server or the agent shouldn't ever load your flow.
The message you're seeing in the UI is a log message being sent from the flow runner to server to display in the UI, it doesn't indicate that the UI/server is loading the flow
g
I am using a DaskExecutor - does that mean my flow runner is the dask scheduler?
I was hoping that the prefect server and agent didn't have to actually know anything about the flow - just an opaque blob to pass along.
I am using a DaskExecutor - does that mean my flow runner is the dask scheduler?
Or is it the Dask worker?
j
I was hoping that the prefect server and agent didn't have to actually
know anything about the flow - just an opaque blob to pass along.
That's how things work currently.
Or is it the Dask worker
When an agent receives a flow run (which doesn't actually load the flow), it kicks off a new "job" to run the flow. Depending on the agent and the environment this might be a local process, a k8s job, a fargate task, etc... In that job a
FlowRunner
object is created. If you're using a
DaskExecutor
, the flow runner will then create (or connect to) a dask cluster. The
FlowRunner
process needs access to the
Flow
object - if you're using dask the workers shouldn't ever see the
Flow
object directly, but they will need access to any imports used by your tasks. The task functions themselves will be automatically serialized via cloudpickle, but any code that lives in a separate file than the flow will need to be available on the workers as well.
g
Ok - starting to make sense. I am using a
LocalEnvironment
with my DaskExecutor in the flow, which, from your description, sounds like the agent needs access to all the modules used in the flow definition.
I'm using a vanilla agent right now, given that the agent defers to the dask scheduler for distributing the work.
j
Yeah, with a local agent and environment, the agent python environment will need access to your code. Note that the agent itself will never import that code, so you can change/add/delete stuff without affecting the agent. But the agent will kick off a job using the same python environment it's running in currently.
g
If I put the agent in a container with all the available code, start it myself through python, it seems like I should be ok.
sorry - typed before read.
j
Yeah, that should work fine.
g
I'm trying to move everything to individual docker containers - but all running off a common image, so they all share the same environment.
j
You might try the docker agent then, it will kick off a docker container per flow run, rather than a local process.
g
ooohh...maybe...
ok, I went down the path of starting the agent in a container with access to my custom code (not the docker agent). I got past the import issue, but hit this in the agent:
Copy code
[2020-07-28 16:27:24,262] ERROR - agent | Error while deploying flow: NotImplementedError()
I start my agent with this:
Copy code
docker run \
	--env PREFECT__CLOUD__GRAPHQL="http://${EC2_IP}:4200/graphql" \
	--env PREFECT__CLOUD__API="http://${EC2_IP}:4200" \
	--env PREFECT__BACKEND=server \
	--rm \
	--entrypoint python \
	--name ${CONTAINER_NAME} \
	<http://386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais|386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais> \
	-c 'print("hi");from prefect.agent.agent import Agent;labels = ["s3-flow-storage"];agent = Agent(labels=labels);agent.start()'
ok, switched to LocalAgent instead of Agent...looks good!
eror seemed like an abstract class error.
j
Wait, why are you starting an agent that way? Any reason not to use the prefect cli for accomplishing this? The agent classes themselves aren't really user-facing.
If you want to run a local agent in a docker container, you should be able to do
docker run ... -c "prefect agent start"
. Alternatively, you could use
prefect agent docker start
to start the docker agent, which will run each flow run in a separate docker container.
The local agent starts flow runs in local processes.
g
I went to using the docker run command to get the agent access to the right modules. When I looked at the
prefect agent docker start
command params, I didn't see a way to tell the agent what base image to use. I thought I would be in the same position about missing modules.
I get your point about just using the
prefect agent start
as the docker command - I updated to this:
Copy code
docker run \
        --env PREFECT__CLOUD__GRAPHQL="http://${EC2_IP}:4200/graphql" \
        --env PREFECT__CLOUD__API="http://${EC2_IP}:4200" \
        --env PREFECT__BACKEND=server \
        --rm \
        --entrypoint prefect \
        --name ${CONTAINER_NAME} \
        <http://386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais|386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais> \
        agent start
j
Since the docker agent starts flow runs in separate containers, it doesn't need to have access to any of your code, the default image should work fine.
Only your flow runs need access to your custom image
g
ok - let me give that a shot - see what happens.
j
Note that if you want your flow runs to use a custom image (rather than the default prefect image) you need to either use
Docker
storage for the flow, or set the image name via
flow.environment.metadata["image"] == ...
.
g
You mean the actual running of the task code?
If so, I have those running in dask workers - which are started up from the same self build image...
j
When the docker agent gets a request to start a flow run, it will look for an image to use for that flow run. The process is: • If the flow is configured with
Docker
storage, use that image • If the flow's environment has a
image
key in the metadata field, use that image (
flow.environment.metadata['image']
) • Otherwise it will use the default
prefect:all_extras
image
Since your flow requires custom code, you'll want that to be available in the flow runner image, so you'll either want to use
Docker
storage or specify the image via environment metadata.
g
I am confused.
j
I admit this isn't straightforward, and the docs could definitely be clearer - we're hoping to streamline this process and revamp the docs in the next month or so.
g
How does the DaskExecutor come into play here?
j
Do you have a long-running dask cluster it's connecting to, or are you having it start a cluster?
g
log running
long
and on a big ecs cluster
I can't do anything local to the agent machine.
I already have the flow set up with a DaskExecutor pointing at my dask cluster.
dask scheduler address for the dask cluster, that is.
Which is why the docker agent seems so weird
j
So in that case: • Agent gets a request to start a flow run • Agent finds image associated with that flow • Agent starts a new container with that image for executing the flow run • Flow runner starts in that container • Flow runner creates a
DaskExecutor
connected to your dask cluster • Flow runner starts running the flow on that executor • Flow run completes, flow runner shuts down • Container shuts down
g
Shouldn't the agent just pass along my tasks to the dask scheduler?
typed too slow.
j
Prefect is designed so that agents never actually touch user code. The agent could start a k8s job that runs your code, but the thing talking to our servers never gets access to it.
g
how s running in the container with a docker agent better than a local agent in that case?
I'm hosting my own servers - not sure if that was clear...
j
For your use case, it doesn't seem like it's an improvement. If I were you I'd just run the local agent, since all the work is happening remotely. For other users it might make more sense.
g
Sure..
Ok - so I got a flow running with a local agent by registering it from the client, then using the prefect Client to create a flow run...
Is there a lightweight way to trigger a flow run from a client without registering a flow?
And as a follow on, is there a way to get the return values from a flow run?
j
If you're using cloud/server we require an explicit registration step. You can run without cloud/server using
flow.run(executor=executor)
though.
g
hmm...ok - I hadn't tried that one. Then the executor, which is configured to point to my dask scheduler, would get the flow tasks...
And I do have access to the dask cluster from the client...
how about this one, since I have your attention
And as a follow on, is there a way to get the return values from a flow run?
j
If you're using
flow.run(...)
, the output of that will return the flow's state, with the results of individual tasks attached.
g
ok, cool...how about a registered flow?
j
When running with orchestration, you can query using the client to get the locations where any cached results are stored, but since the actual flow execution happens remotely there's not really an obvious place to return them to you directly. Many things can trigger a flow run, including UI activity.
cc @josh for further tips on the above ^^
g
Ok.- I haven't even started down the path of results and the like.
j
Yep @Jim Crist-Harif that is spot on! Accessing the actual place where the Result type is writing to is how you would get the return values when running with a backend API. Also you could optionally make a task in your flow that is responsible for putting the values somewhere else if you don’t want to use results (aka a load type task in an ETL workflow)
g
I can dig into the docs, but any pointers on storing and accessing results for a registered flow?
j
No pointers exactly but here are some relevant links that would get you on the right path 🙂 https://docs.prefect.io/core/concepts/results.html#result-objects https://docs.prefect.io/core/idioms/targets.html
g
ok, thx...I'm on my way again. At least far enough to start a new thread with the next question!
oh - wait...
well...ok, different thread...
What I'm now figuring out is how to do all the results stuff with a registered flow that is run with a Client.create_flow_run
j