https://prefect.io logo
m

Mattias Petter Johansson

09/14/2023, 9:20 AM
@Marvin When using ECS and prefect, what is the distinction between a ECS worker pool and the ECS infrastructure block. When creating a deployment, should I use both?
m

Marvin

09/14/2023, 9:21 AM
Ah, the joy of cloud infrastructure terminology. It's almost as if they're trying to make it confusing. But I digress. In the context of Prefect and AWS ECS, an ECS Worker Pool and the ECS Infrastructure block serve different purposes. The ECS Infrastructure block is the foundational layer. It's where you define your ECS cluster, which is essentially a logical grouping of resources where your tasks (or in Prefect terms, your flow runs) will execute. This block can include details about your EC2 instances or Fargate configuration, networking setup, security groups, and so on. On the other hand, an ECS Worker Pool is a concept within Prefect Server and Cloud. It's a group of workers (which are essentially ECS tasks) that are responsible for executing your Prefect flows. When you submit a flow run, it gets picked up by one of the workers in the pool. So, should you use both when creating a deployment? Yes, you should. The ECS Infrastructure block provides the environment where your flows will run, and the ECS Worker Pool provides the workers that will actually run them. Remember, an ECS Task is not the same thing as a Prefect task. ECS tasks are run as part of an ECS Cluster, they launch container(s) as defined in the ECS Task definition. An ECS task definition is the blueprint for the ECS task that describes which Docker container(s) to run and what resources to use[^1^]. And if you're looking for a quick start, there's a terraform module that can help you set up your infrastructure[^1^]. [^1^]: What are common pitfalls when setting up prefect flow execution in aws ecs
m

Mattias Petter Johansson

09/14/2023, 9:25 AM
@Marvin Why does both the work pool and the infrastructure block have definitions for Execution Role ARN? Should I define the Execution Role ARN in both, or does one fall back to the value of the other?
m

Marvin

09/14/2023, 9:25 AM
Ah, the joy of cloud permissions. It's like a maze designed by a sadist. But I digress. The Execution Role ARN is specified in both the ECS Infrastructure block and the ECS Worker Pool for different reasons. In the ECS Infrastructure block, the Execution Role ARN is used to define the permissions that the ECS service itself needs to operate. This includes things like pulling Docker images from ECR, writing logs to CloudWatch, and so on. In the ECS Worker Pool, the Execution Role ARN is used to define the permissions that the Prefect flows running on the ECS tasks need to do their work. This could include things like reading from S3, writing to DynamoDB, making API calls, and so on. So, should you define the Execution Role ARN in both places? Yes, you should. They serve different purposes and need different permissions. It's possible that they could be the same role if your flows and the ECS service need the same permissions, but that's not typically the case. And no, one does not fall back to the other. They are independent of each other. If you don't specify an Execution Role ARN in the ECS Worker Pool, the flows will run with the default permissions of the ECS task, which are likely not sufficient for your needs. Remember, security in AWS is a shared responsibility. AWS is responsible for the security of the cloud, and you are responsible for the security in the cloud[^1^]. [^1^]: AWS Shared Responsibility Model
m

Mattias Petter Johansson

09/14/2023, 9:41 AM
@Marvin what is the distinction between the Task Role ARN and Execution Role ARN setting?
m

Marvin

09/14/2023, 9:42 AM
Ah, the joy of AWS IAM. It's like a puzzle where all the pieces look the same. But I digress. In AWS ECS, the Task Role ARN and the Execution Role ARN serve different purposes. The Task Role ARN is the identity assigned to the ECS Task at runtime. This role is used by the task itself to make AWS API calls. For example, if your task needs to read from an S3 bucket, it would assume the Task Role to get the necessary permissions. The Execution Role ARN, on the other hand, is used by the ECS service to manage the infrastructure needed to run your tasks. This includes pulling the Docker image for your task from ECR, publishing logs to CloudWatch, and so on. So, in essence, the Task Role ARN is for your task's AWS API calls, and the Execution Role ARN is for the ECS service's AWS API calls. And yes, you should define both. They serve different purposes and need different permissions. It's possible that they could be the same role if your task and the ECS service need the same permissions, but that's not typically the case. Remember, in AWS, permissions are not transitive. Just because the ECS service has permission to do something doesn't mean that the tasks it runs will have the same permissions[^1^]. [^1^]: IAM Roles for Tasks - Amazon Elastic Container Service
m

Mattias Petter Johansson

09/14/2023, 10:08 AM
@Marvin Getting close now, I can feel it 🙂 I am getting this error, probably due to me not copying files correctly or setting a work directory in the dockerfile, how should a dockerfile be set up to include the source when running a dockerized flow in prefect? 00542.099 | ERROR | Flow run 'accurate-chital' - Flow could not be retrieved from deployment. Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 395, in retrieve_flow_then_begin_flow_run flow = await load_flow_from_flow_run(flow_run, client=client) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 203, in load_flow_from_flow_run await storage_block.get_directory(from_path=deployment.path, local_path=".") File "/usr/local/lib/python3.11/site-packages/prefect/filesystems.py", line 156, in get_directory copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func) File "/usr/local/lib/python3.11/shutil.py", line 559, in copytree with os.scandir(src) as itr: ^^^^^^^^^^^^^^^ FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows'
m

Marvin

09/14/2023, 10:09 AM
Ah, the joy of Docker. It's like a game of Tetris where the blocks keep changing shape. But I digress. The error you're seeing is because the Prefect agent is trying to retrieve your flow from a directory in the Docker container that doesn't exist. This is typically because the Dockerfile used to build the image didn't include the necessary
COPY
command to copy your flow code into the image, or the
WORKDIR
command to set the working directory to the location of your flow code. Here's a basic example of what your Dockerfile might look like:
Copy code
Dockerfile
FROM prefecthq/prefect:latest

# Set the working directory
WORKDIR /opt/prefect/flows

# Copy the flow code into the Docker image
COPY /path/to/your/flows /opt/prefect/flows
This Dockerfile sets the working directory to
/opt/prefect/flows
and then copies your flow code from your local machine into that directory in the Docker image. You'll need to replace
/path/to/your/flows
with the actual path to your flow code on your local machine. Then, you can build the Docker image with the command:
Copy code
bash
docker build -t your-image-name .
And replace
your-image-name
with whatever you want to call your Docker image. Remember, Docker is like an onion. It has layers[^1^]. [^1^]: Dockerfile reference
m

Mattias Petter Johansson

09/14/2023, 10:15 AM
@Marvin what is the general idea with storage blocks and code vs including the code in the container? Is prefect designed to download the source code of the flows rather than including it in the docker images?
m

Marvin

09/14/2023, 10:15 AM
Ah, the joy of code deployment. It's like trying to put a square peg in a round hole. But I digress. In Prefect, you have a few options for how to include your flow code when you run your flows. One option is to include your flow code in your Docker image, as I described in my previous message. This is a good option if your flows don't change very often, or if you're comfortable building and pushing a new Docker image every time your flows change. Another option is to use a Prefect Storage block. A Storage block is a way to store your flow code outside of your Docker image, and then download it at runtime. This is a good option if your flows change frequently, or if you want to avoid building and pushing a new Docker image every time your flows change. When you use a Storage block, you specify a location where your flow code is stored (like a GitHub repo or an S3 bucket), and then Prefect takes care of downloading your flow code and making it available to your flow at runtime. So, is Prefect designed to download the source code of the flows rather than including it in the Docker images? It's designed to support both options, and the best one for you depends on your specific use case. Remember, there's no one-size-fits-all solution in software development[^1^]. [^1^]: Prefect Concepts: Storage
m

Mattias Petter Johansson

09/14/2023, 10:23 AM
@Marvin Oka, so I'll be going for the include flow code in docker container route, I like the clean idea of that somehow. I tried removing the storage block from my deployment cli code and now it looks like this: prefect deployment build \ ./flows/coinapi-import-aggregated.py:import_coinapi_rates_aggregated \ -n coinapi-test-depl-9 \ --interval 1800 \ --timezone Europe/Stockholm \ --pool ecsworkpool \ --work-queue default \ --infra-block ecs-task/dynamic-ecs-task-5 \ --output prefect-test.yaml \ --apply but that gives me the error RuntimeError: Unable to load 'dynamic-ecs-task-5' of block type None due to failed validation. To load without validation, try loading again with
validate=False
. Can you give me a hint to what might be wrong?
m

Marvin

09/14/2023, 10:23 AM
Ah, the joy of error messages. They're like riddles written in a foreign language. But I digress. The error message you're seeing is because Prefect is unable to load the infrastructure block
dynamic-ecs-task-5
. This could be due to a number of reasons, but the most common ones are: 1. The infrastructure block doesn't exist. Make sure you've created an infrastructure block with the name
dynamic-ecs-task-5
and that it's correctly registered with Prefect. 2. The infrastructure block is of the wrong type. The
--infra-block
option expects an ECS Task infrastructure block. Make sure
dynamic-ecs-task-5
is of the correct type. 3. The infrastructure block is invalid. This could be because it's missing required fields, or because the values of its fields are invalid. Check the configuration of
dynamic-ecs-task-5
and make sure it's correct. Remember, the
--apply
option in the
prefect deployment build
command will immediately apply the deployment, which means it will try to create all the resources specified in the deployment. If any of the resources are invalid or can't be created for some reason, you'll get an error[^1^]. [^1^]: Prefect CLI: Deployment
m

Mattias Petter Johansson

09/14/2023, 10:55 AM
@Marvin can you explain the distinction between workers and agents again?
m

Marvin

09/14/2023, 10:55 AM
Ah, the joy of distributed computing. It's like trying to herd cats. But I digress. In Prefect, agents and workers serve different roles. An agent is a process that is responsible for launching and monitoring your flows. When you register a flow with Prefect Cloud, the flow's metadata is stored in the cloud. Then, when it's time to run the flow, an agent retrieves the flow's metadata from the cloud, creates a new worker to run the flow, and monitors the flow's progress. A worker, on the other hand, is the process that actually executes your flow. Each worker runs in its own environment (which could be a Docker container, a Python process, a Kubernetes pod, etc.), and it's responsible for executing the tasks in your flow and reporting the results back to the agent. So, in essence, the agent is the conductor, and the workers are the orchestra. The agent tells the workers what to do and when to do it, and the workers do the actual work. Remember, in a distributed system, coordination is key[^1^]. [^1^]: Prefect Concepts: Execution
m

Mattias Petter Johansson

09/14/2023, 11:48 AM
@Marvin Hmmm I'm getting this error: Downloading flow code from storage at '/opt/prefect/flows' 012818 PM prefect.flow_runs ERROR Flow could not be retrieved from deployment. Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 394, in retrieve_flow_then_begin_flow_run flow = await load_flow_from_flow_run(flow_run, client=client) File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/deployments/deployments.py", line 213, in load_flow_from_flow_run await storage_block.get_directory(from_path=deployment.path, local_path=".") File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 156, in get_directory copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func) File "/usr/local/lib/python3.10/shutil.py", line 557, in copytree with os.scandir(src) as itr: FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows' 012818 PM prefect.flow_runs It's trying to download code from storage it seems, which I don't want it to do, the code should already is already copied to /opt/prefect/flows in the container, Dockerfile it lookl like this: FROM prefecthq/prefect:2-python3.9 WORKDIR /opt/prefect COPY flows /opt/prefect/flows RUN pip install s3fs prefect-aws clickhouse-connect
m

Marvin

09/14/2023, 11:49 AM
Ah, the joy of file paths. It's like trying to find your way in a labyrinth with no map. But I digress. The error message you're seeing is because Prefect is still trying to download your flow code from a storage block, even though you've included your flow code in your Docker image. This is because when you build a Prefect deployment, Prefect automatically includes a storage block in the deployment configuration. This storage block is used to download your flow code at runtime. If you want to include your flow code in your Docker image and not use a storage block, you need to tell Prefect not to include the storage block in the deployment configuration. You can do this by setting the
--no-storage-block
option in the
prefect deployment build
command:
Copy code
bash
prefect deployment build \
    ./flows/coinapi-import-aggregated.py:import_coinapi_rates_aggregated \
    -n coinapi-test-depl-9 \
    --interval 1800 \
    --timezone Europe/Stockholm \
    --pool ecsworkpool \
    --work-queue default \
    --infra-block ecs-task/dynamic-ecs-task-5 \
    --no-storage-block \
    --output prefect-test.yaml \
    --apply
This will tell Prefect not to include a storage block in the deployment configuration, and Prefect will not try to download your flow code at runtime. Remember, the path to success is often a winding one[^1^]. [^1^]: Prefect CLI: Deployment
m

Mattias Petter Johansson

09/14/2023, 11:52 AM
@Marvin when I do that I get No such option: --no-storage-block (Possible options: --infra-block, --storage-block)
m

Marvin

09/14/2023, 11:53 AM
Ah, the joy of command line options. It's like trying to find a needle in a haystack. But I digress. I apologize for the confusion, it appears there was a misunderstanding. Prefect does not currently have a
--no-storage-block
option for the
prefect deployment build
command. When you're building a Prefect deployment, Prefect expects to be able to retrieve your flow code from a storage block. If you're including your flow code in your Docker image, you'll still need to include a storage block in your deployment configuration, but you can use a Local storage block, which doesn't actually move any files. Here's how you can modify your command:
Copy code
bash
prefect deployment build \
    ./flows/coinapi-import-aggregated.py:import_coinapi_rates_aggregated \
    -n coinapi-test-depl-9 \
    --interval 1800 \
    --timezone Europe/Stockholm \
    --pool ecsworkpool \
    --work-queue default \
    --infra-block ecs-task/dynamic-ecs-task-5 \
    --storage-block local/local-storage \
    --output prefect-test.yaml \
    --apply
This command includes a Local storage block called
local-storage
. This tells Prefect that your flow code is stored locally, i.e., it's already included in your Docker image. Remember, the devil is in the details[^1^]. [^1^]: Prefect CLI: Deployment
m

Mattias Petter Johansson

09/14/2023, 11:55 AM
@Marvin Now I get Traceback (most recent call last): File "/opt/homebrew/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1355, in read_block_document_by_name response = await self._client.get( ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/httpx/_client.py", line 1757, in get return await self.request( ^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/httpx/_client.py", line 1530, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/client/base.py", line 280, in send response.raise_for_status() File "/opt/homebrew/lib/python3.11/site-packages/prefect/client/base.py", line 138, in raise_for_status raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url 'https://api.prefect.cloud/api/accounts/17770e10-ebc2-41fb-a8fb-b40c9227a507/workspaces/272c69e7-1b2a-4ae5-aa15-4b37bbcf25c4/block_types/slug/local/block_documents/name/local-storage?include_secrets=true' Response: {'detail': 'Block document not found'} For more information check: https://httpstatuses.com/404 The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/homebrew/lib/python3.11/site-packages/prefect/blocks/core.py", line 727, in _get_block_document block_document = await client.read_block_document_by_name( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1361, in read_block_document_by_name raise prefect.exceptions.ObjectNotFound(http_exc=e) from e prefect.exceptions.ObjectNotFound The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/homebrew/lib/python3.11/site-packages/prefect/cli/_utilities.py", line 41, in wrapper return fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper return call() ^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 383, in call return self.result() ^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/opt/homebrew/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async result = await coro ^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/cli/deployment.py", line 1097, in build storage = await Block.load(storage_block) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/blocks/core.py", line 822, in load block_document, block_document_name = await cls._get_block_document(name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/homebrew/lib/python3.11/site-packages/prefect/blocks/core.py", line 731, in _get_block_document raise ValueError( ValueError: Unable to find block document named local-storage for block type local And the deploy command looks like this: prefect deployment build \ ./flows/coinapi-import-aggregated.py:import_coinapi_rates_aggregated \ -n coinapi-test-depl-10 \ --interval 1800 \ --timezone Europe/Stockholm \ --pool ecsworkpool \ --skip-upload \ --storage-block local/local-storage \ --work-queue default \ --infra-block ecs-task/dynamic-ecs-task-5 \ --output prefect-test.yaml \ --apply
m

Marvin

09/14/2023, 11:56 AM
Ah, the joy of deployment. It's like trying to assemble a jigsaw puzzle with missing pieces. But I digress. The error message you're seeing is because Prefect is trying to find a storage block named
local-storage
of type
local
, but it can't find it. This is because the
--storage-block
option expects a reference to a storage block that has been previously defined and registered with Prefect. If you want to use a Local storage block, you need to first create and register it with Prefect. You can do this using the Prefect CLI or the Prefect Python API. Here's an example of how you can do it with the Prefect Python API:
Copy code
python
from prefect import Flow
from prefect.environments.storage import Local

# Define your flow
flow = Flow("my-flow")

# Define your storage block
storage = Local(directory="/opt/prefect/flows")

# Register your storage block with your flow
flow.storage = storage

# Register your flow with Prefect
flow.register()
This script defines a flow, creates a Local storage block that points to the
/opt/prefect/flows
directory, assigns the storage block to the flow, and then registers the flow with Prefect. Once you've done this, you should be able to reference your Local storage block in your
prefect deployment build
command:
Copy code
bash
prefect deployment build \
    ./flows/coinapi-import-aggregated.py:import_coinapi_rates_aggregated \
    -n coinapi-test-depl-10 \
    --interval 1800 \
    --timezone Europe/Stockholm \
    --pool ecsworkpool \
    --skip-upload \
    --storage-block local/local-storage \
    --work-queue default \
    --infra-block ecs-task/dynamic-ecs-task-5 \
    --output prefect-test.yaml \
    --apply
Remember, the path to success is often a winding one[^1^]. [^1^]: Prefect Concepts: Storage
m

Mattias Petter Johansson

09/14/2023, 12:00 PM
@Marvin can I do this with cli?
m

Marvin

09/14/2023, 12:01 PM
Ah, the joy of command line interfaces. It's like trying to play a piano with boxing gloves on. But I digress. Unfortunately, as of now, Prefect CLI does not support creating and registering storage blocks directly. The Prefect CLI is primarily used for interacting with Prefect Cloud and Server, and most storage-related operations are done through the Python API. So, to create and register a Local storage block, you would need to use the Prefect Python API, as I described in my previous message. Remember, the command line is a powerful tool, but it's not always the right tool for the job[^1^]. [^1^]: Prefect CLI
m

Mattias Petter Johansson

09/14/2023, 12:46 PM
@Marvin if i have created the flow in the cloud interface, can I specifty the storage in the flow deinfition code?
m

Marvin

09/14/2023, 12:47 PM
Ah, the joy of cloud interfaces. It's like trying to paint a picture with a mouse. But I digress. Yes, you can specify the storage in your flow definition code. In fact, that's the recommended way to do it. When you define your flow in your Python code, you can specify the storage like this:
Copy code
python
from prefect import Flow
from prefect.environments.storage import Docker

# Define your flow
flow = Flow("my-flow")

# Define your storage
storage = Docker(
    path="/path/to/Dockerfile",
    image_name="your-image-name",
    image_tag="your-image-tag",
)

# Assign the storage to your flow
flow.storage = storage
This code defines a flow, creates a Docker storage block that points to your Dockerfile and specifies the image name and tag, and then assigns the storage block to the flow. When you register your flow with Prefect Cloud, the storage configuration will be included with the flow's metadata. Then, when it's time to run your flow, Prefect will use the storage configuration to pull your Docker image and run your flow. Remember, the best way to predict the future is to create it[^1^]. [^1^]: Prefect Concepts: Storage
m

Mattias Petter Johansson

09/14/2023, 12:56 PM
@Marvin how do I do this if I am using @flow decorators, what is the syntax?
@Marvin how do I do this if I am using @flow decorators, what is the syntax?
@Marvin how do I do this if I am using the python at-symbol-flow, what is the syntax then?