M. Siddiqui
11/26/2021, 3:20 PMAnna Geller
flow.run_config = ECSRun(
labels=["prod"],
task_role_arn="arn:aws:iam::XXXX:role/prefectTaskRole",
execution_role_arn="arn:aws:iam::XXXX:role/prefectECSAgentTaskExecutionRole",
image="<http://XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest|XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest>",
run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
#2. Regarding dbt dependencies within the image, those would need to be installed within your Dockerfile, e.g.:
FROM prefecthq/prefect:latest-python3.9
RUN pip install dbt-snowflake==0.21.0
#3. Regarding storage: Storage is an abstraction that defines where Prefect can find your flow definition. Examples of storage could be a Git repository, a cloud storage object (e.g. S3), a local file, or a docker image. This configuration is required because of the hybrid execution model that keeps your code and data private while still taking full advantage of a managed workflow orchestration service.
We’ve recently published this article about dbt - sharing in case it may be useful to you: https://www.prefect.io/blog/flow-of-flows-orchestrating-elt-with-prefect-and-dbt/
Regarding how to build a Docker image for Prefect, check out this post: https://medium.com/the-prefect-blog/the-simple-guide-to-productionizing-data-workflows-with-docker-31a5aae67c0a
And here is an example repo that shows how custom dependencies can be built into a Docker image and used across various run configurations: https://github.com/anna-geller/packaging-prefect-flows/M. Siddiqui
11/26/2021, 3:34 PMfrom random import randrange
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow
FLOW_NAME = "ecs-parallel"
STORAGE = S3(
bucket="prefect-datasets-2",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = ECSRun(
labels=["test"],
task_role_arn="arn:aws:iam::115627025420:role/prefectTaskRole",
run_task_kwargs=dict(cluster="prefectEcsCluster", launchType="FARGATE", ),
)
@task
def random_num(stop):
number = randrange(stop)
print(f"Your number is {number}")
return number
@task
def sum_numbers(numbers):
print(sum(numbers))
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG, ) as flow:
stop = 100
number_1 = random_num(stop)
number_2 = random_num(stop)
number_3 = random_num(stop)
number_4 = random_num(stop)
number_5 = random_num(stop)
number_6 = random_num(stop)
sum_numbers = sum_numbers(numbers=[number_1, number_2, number_3, number_4, number_5, number_6])
if __name__ == "__main__":
flow.register(project_name="elt",
idempotency_key=flow.serialized_hash())
However that wasn't the case (I think), the agent seemed to have launched a new container, run the flow and then exited.
I went ahead and brought up the Desired Count
of the prefectECSAgent to 3 to see what happens. Oddly enough, Prefect cloud could only see TWO ECS agents instead of three. Hence I bumped the count up to 5, and still only 2 agents were visible.
So just to confirm:
• Can a single Flow be run by multiple (similar and dissimilar) agents ? If that's not the case, would having multiple agents mean being able to run multiple FLOWS in concurrency ? And if I want parallelism within a single flow, I should rely on the Dask executor ?
• Do you have an idea why bringing up the desired count of the prefectEcsAgent did not correctly reflect the number of agents available ? (I feel it may not be designed to be scaled in that manner)
Looking forward to your reply !Anna Geller
And if I want parallelism within a single flow, I should rely on the Dask executor ?Correct, you could e.g. use LocalDaskExecutor. But you would need to use mapping to make your current flow example parallel:
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def generate_random_numbers():
return list(range(1, 5))
@task
def add_one(x):
return x + 1
@task(log_stdout=True)
def print_results(res):
print(res)
with Flow("mapping-example", executor=LocalDaskExecutor()) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
print_results(result)
This entire flow will still run within one ECS task and will parallelize across existing threadsM. Siddiqui
12/06/2021, 11:15 AM