Hello guys ! I am currently doing some ETL operati...
# prefect-server
m
Hello guys ! I am currently doing some ETL operations via DBT. As our cases start to grow, we decided to opt for an orchestration tool other than lambdas on AWS. I had experience working with Airflow in the past, but within DBT's docs I stumbled upon Prefect and it seemed to check ALL the right marks regarding the pain points I struggled with Airflow. Hence as a newbie I'm a little confused with some concepts: • Since Agents are responsible for actually executing the tasks, are we responsible for handling their environment python dependencies ? For example, I'm trying to create a Dockerfile which would be run on an AWS ECS container. I found a prefect docker image on dockerhub which would serve as a base image for adding DBT dependencies so that the agent can execute the code. Is this assumption right ? Since I can find prefect cli commands to deploy agents on ECS, Docker, Kubernetes and etc, but I don't see anywhere where the dependencies get resolved .... • I'm also a little confused as to the purpose of Storage ... Is it a compiled version of the flows ? What benefit do I get by adding a Github or S3 Storage layer ? Thanks !
a
#1. Are we responsible for handling their environment python dependencies? - yes, often it’s handled via a custom image that can be set via run configuration e.g.:
Copy code
flow.run_config = ECSRun(
    labels=["prod"],
    task_role_arn="arn:aws:iam::XXXX:role/prefectTaskRole",
    execution_role_arn="arn:aws:iam::XXXX:role/prefectECSAgentTaskExecutionRole",
    image="<http://XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest|XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest>",
    run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
#2. Regarding dbt dependencies within the image, those would need to be installed within your Dockerfile, e.g.:
Copy code
FROM prefecthq/prefect:latest-python3.9
RUN pip install dbt-snowflake==0.21.0
#3. Regarding storage: Storage is an abstraction that defines where Prefect can find your flow definition. Examples of storage could be a Git repository, a cloud storage object (e.g. S3), a local file, or a docker image. This configuration is required because of the hybrid execution model that keeps your code and data private while still taking full advantage of a managed workflow orchestration service. We’ve recently published this article about dbt - sharing in case it may be useful to you: https://www.prefect.io/blog/flow-of-flows-orchestrating-elt-with-prefect-and-dbt/ Regarding how to build a Docker image for Prefect, check out this post: https://medium.com/the-prefect-blog/the-simple-guide-to-productionizing-data-workflows-with-docker-31a5aae67c0a And here is an example repo that shows how custom dependencies can be built into a Docker image and used across various run configurations: https://github.com/anna-geller/packaging-prefect-flows/
m
Amazing, thanks for the immediate response ! Will go through them one by one 🙏
🙌 1
@Anna Geller Sorry to bother you again, but I've tried out the ECS agent tutorial you wrote up (great read btw 👏). I'm still getting around on some AWS concepts, we usually have dedicated DevOps people handling these sort of tasks, but since I'm experimenting with this feature .. it'll be up to me 😅 Anyway, I changed the necessary variables under the bash script shared and ran a couple of basic tasks. So far so good ! What I initially assumed based on the read and diagram is that each task within the flow would trigger the agent to spin up a new ECS machine 😅 Which I thought would be limited to how many concurrent tasks we've allowed in our Prefect settings, hence the Prefect ECS agent would dynamically spin up and down the necessary ECS containers based on Flow. This is the code I'm currently testing out to test my hypothesis:
Copy code
from random import randrange
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow

FLOW_NAME = "ecs-parallel"
STORAGE = S3(
    bucket="prefect-datasets-2",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = ECSRun(
    labels=["test"],
    task_role_arn="arn:aws:iam::115627025420:role/prefectTaskRole",
    run_task_kwargs=dict(cluster="prefectEcsCluster", launchType="FARGATE", ),
)

@task
def random_num(stop):
    number = randrange(stop)
    print(f"Your number is {number}")
    return number


@task
def sum_numbers(numbers):
    print(sum(numbers))


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG, ) as flow:
    stop = 100

    number_1 = random_num(stop)
    number_2 = random_num(stop)
    number_3 = random_num(stop)
    number_4 = random_num(stop)
    number_5 = random_num(stop)
    number_6 = random_num(stop)

    sum_numbers = sum_numbers(numbers=[number_1, number_2, number_3, number_4, number_5, number_6])

if __name__ == "__main__":
    flow.register(project_name="elt",
                  idempotency_key=flow.serialized_hash())
However that wasn't the case (I think), the agent seemed to have launched a new container, run the flow and then exited. I went ahead and brought up the
Desired Count
of the prefectECSAgent to 3 to see what happens. Oddly enough, Prefect cloud could only see TWO ECS agents instead of three. Hence I bumped the count up to 5, and still only 2 agents were visible. So just to confirm: • Can a single Flow be run by multiple (similar and dissimilar) agents ? If that's not the case, would having multiple agents mean being able to run multiple FLOWS in concurrency ? And if I want parallelism within a single flow, I should rely on the Dask executor ? • Do you have an idea why bringing up the desired count of the prefectEcsAgent did not correctly reflect the number of agents available ? (I feel it may not be designed to be scaled in that manner) Looking forward to your reply !
a
@M. Siddiqui you can run as many flow runs of each flow as you wish, the ECS agent registers a task definition and runs an independent ECS task for each flow run. The ECS agent is a single process to which you set a label. This process polls for scheduled flow runs, and if there are some new flow runs, it executes those as new ECS tasks on the specified ECS cluster. That's why when you scale the ECS service to multiple containers but all those containers run an agent with the same label, then all those agents will be polling for the same flows (i.e. flows with the run configuration set to ECSRun with a label e.g. "prod"), which may result in a flow run being picked up by both and executed twice. To scale the agent process itself (if you really need it), you would deploy another ECS service but with an agent that has a different label. This way one agent could poll for flows with label "prod" and another would poll for a different label. Does it make sense?
And if I want parallelism within a single flow, I should rely on the Dask executor ?
Correct, you could e.g. use LocalDaskExecutor. But you would need to use mapping to make your current flow example parallel:
Copy code
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor


@task
def generate_random_numbers():
    return list(range(1, 5))


@task
def add_one(x):
    return x + 1


@task(log_stdout=True)
def print_results(res):
    print(res)


with Flow("mapping-example", executor=LocalDaskExecutor()) as flow:
    numbers = generate_random_numbers()
    result = add_one.map(numbers)
    print_results(result)
This entire flow will still run within one ECS task and will parallelize across existing threads
m
Understood, thanks a lot for the detailed reply !
🙌 1