Hi all. I am moving from a local setup to a cloud ...
# prefect-community
n
Hi all. I am moving from a local setup to a cloud one in Fargate, using a Fargate Agent. I see tasks not executing in parallel, and that “the
LocalExecutor
is not capable of parallelism” so I presume that is the issue https://docs.prefect.io/api/latest/engine/executors.html#executor but the other executors require a separate environment in Dask. Someone mentioned a “Fargate Executor” in this Slack but I assume that’s an error and this doesn’t exist from what I can find. I see others using the Fargate Agent and the FargateTaskEnvironment so I wonder if that’s related. The Fargate Agent “deploy[s] flows as Tasks using AWS Fargate”. And “FargateTaskEnvironment is an environment which deploys your flow as a Fargate task”, so I don’t understand the use of it with the Fargate Agent? Also in your docs: “we recommend all users deploy their Flow using the LocalEnvironment configured with the appropriate choice of executor”. How do we get tasks to run in parallel inside a Flow run in Fargate without Dask? Also, @Marvin 🙂
e
I feel like you are confusing executors and environments. Executors are a part of environments. An environment specifies how and where prefect should set up the necessary infrastructure for your flow to run. When the necessary set up is ready, the flow is executed depending on the executor. This means
FargateTaskEnvironment
and
DaskExecutor
are compatible. Prefect will spin up a fargate task with your flow in it, and will create a dask cluster fully within your fargate task. From
LocalEnvironment
docs:
Copy code
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.engine.executors import DaskExecutor

flow = Flow(
    "Dask Executor Example",
    environment=LocalEnvironment(executor=DaskExecutor())
)
I believe you can safely specify an executor for
FargateTaskEnvironment
as well.
n
@emre thanks! I understand part of that (just theory, from reading docs - although you can see my confusion from the quotes describing the Fargate Agent and Environment? As flows do run in Fargate using the Fargate Agent). My main question is, we have a Fargate Agent starting flows in Fargate, but how do we make the tasks run in parallel as at the moment it seems only one runs? Without using Dask.
e
So parallelism is handled by the executor, and only Dask supports parallelism AFAIK. I think I am missing something here, can you post a snippet of your
flow.register()
, or where you are setting up the
FargateTaskEnvironment
? Because simply changing the environments executor parameter to
DaskExecutor()
should parallelize the tasks running inside your Fargate instance.
n
I only use the Fargate Agent, the Fargate Agent’s job is to register a task definition in Fargate for the flow and then run a Fargate Task for the flow run. I don’t use
FargateTaskEnvironment
as I’m not sure what it brings [in addition to using the Fargate Agent], that was one question. But the main goal was to run more than a task at a time, but if this is only possible via Dask (unless we implement our own executors I guess?), that answers it! The flow snippet is just:
Copy code
with Flow("Test") as flow:
    ...

flow.storage = S3(bucket="...")
flow.environment = LocalEnvironment(metadata={"image": "..."})

register_params = {}
register_params["project_name"] = environment
register_params["labels"] = [environment]
flow.register(**register_params)
e
Wait what 😅 . I thought fargate agent only worked with fargate task environment. Still, adding
executor=DaskExecutor()
to your
LocalEnvironment
should let you seamlessly parallelize your flow, all within the fargate instance. You are right, you will need your own executor if you wish to avoid dask. I believe that is a pretty giant thing to try though.
d
Hi @Nelson, Dask is our only out-of-the-box solution for parallelism right now. If you do end up writing your own executor, we’d love to hear how it goes!
n
Thank you very much @emre and @Dylan !