Greg Desmarais
07/28/2020, 1:50 PMLast State Message
[9:35am]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'rightsize'")
class RightsizeFlow(Flow):
Jim Crist-Harif
07/28/2020, 1:52 PMGreg Desmarais
07/28/2020, 2:31 PMI am using a DaskExecutor - does that mean my flow runner is the dask scheduler?Or is it the Dask worker?
Jim Crist-Harif
07/28/2020, 2:37 PMI was hoping that the prefect server and agent didn't have to actually
know anything about the flow - just an opaque blob to pass along.That's how things work currently.
Or is it the Dask workerWhen an agent receives a flow run (which doesn't actually load the flow), it kicks off a new "job" to run the flow. Depending on the agent and the environment this might be a local process, a k8s job, a fargate task, etc... In that job a
FlowRunner
object is created. If you're using a DaskExecutor
, the flow runner will then create (or connect to) a dask cluster. The FlowRunner
process needs access to the Flow
object - if you're using dask the workers shouldn't ever see the Flow
object directly, but they will need access to any imports used by your tasks. The task functions themselves will be automatically serialized via cloudpickle, but any code that lives in a separate file than the flow will need to be available on the workers as well.Greg Desmarais
07/28/2020, 2:43 PMLocalEnvironment
with my DaskExecutor in the flow, which, from your description, sounds like the agent needs access to all the modules used in the flow definition.Jim Crist-Harif
07/28/2020, 2:47 PMGreg Desmarais
07/28/2020, 2:48 PMJim Crist-Harif
07/28/2020, 2:51 PMGreg Desmarais
07/28/2020, 2:53 PMJim Crist-Harif
07/28/2020, 2:53 PMGreg Desmarais
07/28/2020, 3:14 PM[2020-07-28 16:27:24,262] ERROR - agent | Error while deploying flow: NotImplementedError()
docker run \
--env PREFECT__CLOUD__GRAPHQL="http://${EC2_IP}:4200/graphql" \
--env PREFECT__CLOUD__API="http://${EC2_IP}:4200" \
--env PREFECT__BACKEND=server \
--rm \
--entrypoint python \
--name ${CONTAINER_NAME} \
<http://386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais|386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais> \
-c 'print("hi");from prefect.agent.agent import Agent;labels = ["s3-flow-storage"];agent = Agent(labels=labels);agent.start()'
Jim Crist-Harif
07/28/2020, 4:45 PMdocker run ... -c "prefect agent start"
. Alternatively, you could use prefect agent docker start
to start the docker agent, which will run each flow run in a separate docker container.Greg Desmarais
07/28/2020, 5:05 PMprefect agent docker start
command params, I didn't see a way to tell the agent what base image to use. I thought I would be in the same position about missing modules.prefect agent start
as the docker command - I updated to this:
docker run \
--env PREFECT__CLOUD__GRAPHQL="http://${EC2_IP}:4200/graphql" \
--env PREFECT__CLOUD__API="http://${EC2_IP}:4200" \
--env PREFECT__BACKEND=server \
--rm \
--entrypoint prefect \
--name ${CONTAINER_NAME} \
<http://386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais|386834949250.dkr.ecr.us-east-1.amazonaws.com/${IMAGE_NAME}:gdesmarais> \
agent start
Jim Crist-Harif
07/28/2020, 5:07 PMGreg Desmarais
07/28/2020, 5:07 PMJim Crist-Harif
07/28/2020, 5:09 PMDocker
storage for the flow, or set the image name via flow.environment.metadata["image"] == ...
.Greg Desmarais
07/28/2020, 5:10 PMJim Crist-Harif
07/28/2020, 5:13 PMDocker
storage, use that image
• If the flow's environment has a image
key in the metadata field, use that image (flow.environment.metadata['image']
)
• Otherwise it will use the default prefect:all_extras
imageDocker
storage or specify the image via environment metadata.Greg Desmarais
07/28/2020, 5:15 PMJim Crist-Harif
07/28/2020, 5:15 PMGreg Desmarais
07/28/2020, 5:15 PMJim Crist-Harif
07/28/2020, 5:15 PMGreg Desmarais
07/28/2020, 5:16 PMJim Crist-Harif
07/28/2020, 5:17 PMDaskExecutor
connected to your dask cluster
• Flow runner starts running the flow on that executor
• Flow run completes, flow runner shuts down
• Container shuts downGreg Desmarais
07/28/2020, 5:17 PMJim Crist-Harif
07/28/2020, 5:18 PMGreg Desmarais
07/28/2020, 5:18 PMJim Crist-Harif
07/28/2020, 5:19 PMGreg Desmarais
07/28/2020, 5:19 PMJim Crist-Harif
07/28/2020, 5:21 PMflow.run(executor=executor)
though.Greg Desmarais
07/28/2020, 5:22 PMAnd as a follow on, is there a way to get the return values from a flow run?
Jim Crist-Harif
07/28/2020, 5:24 PMflow.run(...)
, the output of that will return the flow's state, with the results of individual tasks attached.Greg Desmarais
07/28/2020, 5:24 PMJim Crist-Harif
07/28/2020, 5:25 PMGreg Desmarais
07/28/2020, 5:26 PMjosh
07/28/2020, 5:27 PMGreg Desmarais
07/28/2020, 5:28 PMjosh
07/28/2020, 5:29 PMGreg Desmarais
07/28/2020, 5:30 PMJim Crist-Harif
07/28/2020, 5:41 PM