https://prefect.io logo
s

Scott Aefsky

02/22/2022, 6:57 PM
Hi all. I have a couple of ETL flows that are going to run on a large quantity of data the first time they run, but much smaller datasets every subsequent run. What I want to do is run with a DaskExecutor on the first run, but a default executor on subsequent runs. I was wondering if it's possible to choose an executor at runtime based on a parameter, or if there is another way to achieve what I'm looking for without having multiple flows. Thanks!
k

Kevin Kho

02/22/2022, 6:59 PM
Hey @Scott Aefsky, I think what we can do here is parameterize the DaskExcutor through a callable. For example:
Copy code
from prefect import Flow
from prefect.executors import DaskExecutor

def dynamic_executor():
    from distributed import LocalCluster
    # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])

with Flow("example", executor=DaskExecutor(cluster_class=dynamic_executor)) as flow:
    flow.add_task(Parameter("n_workers", default=5))
Pass in a parameter to set the number of workers the first time you run and then set the default to a lower value?
upvote 1
s

Scott Aefsky

02/22/2022, 7:00 PM
Ah, very cool. I will give that a shot. Thanks @Kevin Kho!
@Kevin Kho Is there any functional difference between:
Copy code
with Flow("example", executor=DaskExecutor()) as flow:
    pass
and
Copy code
with Flow("example") as flow:
    pass
flow.executor = DaskExecutor()
?
I ask because I realized we are using the second pattern, and am wondering if I need to switch to the first in order to use the dynamic executor you showed me.
k

Kevin Kho

02/22/2022, 8:48 PM
I believe no difference
s

Scott Aefsky

02/22/2022, 8:48 PM
Got it, thanks!
Just following up, this did work the way I had hoped. This is what I actually implemented:
Copy code
def dynamic_executor():
    if prefect.context.parameters["bulk_run"] == 'true':
        return FargateCluster(
            image= ECR_IMAGE, 
            n_workers= 5, # Must specify n_workers
            cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
            task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
            vpc= vpc,
            subnets= [subnet],
            security_groups= [sg], 
        )
    else:
        return LocalCluster()
One of the main reasons for doing this was to eliminate the spin-up time for the FargateCluster if we didn't need that many resources, hence the two different return types. Thanks again for the help @Kevin Kho!
k

Kevin Kho

02/23/2022, 3:25 PM
Yes exactly that makes sense!
👍 1
2 Views