Thread
#prefect-community
    Daniel Katz

    Daniel Katz

    10 months ago
    Hello Prefect community! I would like to be able to run a flow using the
    DaskExecutor
    and pass a parameter like
    num_dask_workers
    which would be used when initializing the
    DaskExecutor
    at run-time. Is it possible to parametrize a flows Executor properties?
    Kevin Kho

    Kevin Kho

    10 months ago
    Hey @Daniel Katz, unfortunately not. It’s quite tricky. There is a hack outlined in this Github issue.
    Michael Adkins

    Michael Adkins

    10 months ago
    Hey @Daniel Katz -- you can pass a callable that returns an executor as the
    flow.executor
    then peek at parameters in the context from there
    Kevin has put doubt in my heart 😄 but I believe I've recommend this successfully before. Something like...
    import prefect
    from prefect import Flow, Parameter
    from prefect.executors import DaskExecutor
    
    def dynamic_executor():
        return DaskExecutor(n_workers=prefect.context.parameters["n_workers"])
        
    with Flow(..., executor=dynamic_executor) as flow:
        n_workers = Parameter("n_workers", default=5)
    Daniel Katz

    Daniel Katz

    10 months ago
    ill give it a shot right now thx both @Michael Adkins @Kevin Kho
    Kevin Kho

    Kevin Kho

    10 months ago
    I believe Michael’s suggestion will work
    Daniel Katz

    Daniel Katz

    10 months ago
    hmm, im getting this error
    Unexpected error: AttributeError("'FunctionTask' object has no attribute 'start'")
    Kevin Kho

    Kevin Kho

    10 months ago
    Could you share you code snippet?
    Daniel Katz

    Daniel Katz

    10 months ago
    @Kevin Kho ^
    Kevin Kho

    Kevin Kho

    10 months ago
    You need to call it:
    executor=get_dask_executor()
    But it’s not working for other reasons
    Yeah it looks like parameters is populated after the executor is set up
    Daniel Katz

    Daniel Katz

    10 months ago
    ok, got it
    thx @Kevin Kho
    Kevin Kho

    Kevin Kho

    10 months ago
    @Michael Adkins could confirm
    Michael Adkins

    Michael Adkins

    10 months ago
    You shouldn't call it, we'll call it at flow runtime & the parameters should be populated if run with an agent I think
    I'm very confused by your error and code. It looks like you had a task decorator on your executor function
    Kevin Kho

    Kevin Kho

    10 months ago
    I replicated with the same code snippet. There was no task above the executor
    Michael Adkins

    Michael Adkins

    10 months ago
    Weird. I'm ooo at the moment. I can take a look when I'm back.
    Sorry I gave you the wrong syntax 🙃
    Anna Geller

    Anna Geller

    10 months ago
    @Daniel Katz translating Michael’s example into your use case would be:
    import prefect
    from prefect import Flow, Parameter
    from prefect.executors import DaskExecutor
    
    
    def dynamic_executor():
        from distributed import LocalCluster
    
        # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
        return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
    
    
    with Flow(
        "dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
    ) as flow:
        flow.add_task(Parameter("n_workers", default=5))
    Daniel Katz

    Daniel Katz

    10 months ago
    worked! thx @Anna Geller @Michael Adkins
    a

    An Hoang

    10 months ago
    @Anna Geller @Michael Adkins what if I want to switch between
    DaskExecutor
    LocalDaskExecutor
    and
    LocalExecutor
    ? I did:
    def dynamic_executor():#executor_type: Literal["local_dask", "cluster_dask", "default"] = "default"):
        executor_type = context.parameters["executor"]
        if executor_type == "local_dask":
            executor = LocalDaskExecutor()
        elif executor_type == "cluster_dask":
            executor = DaskExecutor('<tcp://172.18.102.9:8789>')
        elif executor_type == "default":
            executor = None
        return executor
    
    with Flow("OVP_step2", executor = dynamic_executor) as dev_flow:
        dev_flow.add_task(Parameter("executor", default = "default"))
    And the error was
    'AttributeError: 'function' object has no attribute 'start'
    like @Kevin Kho said, I think this is because parameters is populated after the executor is set up. Any way around this?
    Anna Geller

    Anna Geller

    10 months ago
    @An Hoang this dynamic_executor function should return a dask cluster class, not a DaskExecutor class - your function currently returns executor
    Michael Adkins

    Michael Adkins

    10 months ago
    Unfortunately, you can't do this.
    The syntax there is not supported, you can only pass a callable to the
    cluster_class
    of a
    DaskExecutor
    .
    We'll support this in Orion in the next release.
    Chris Arderne

    Chris Arderne

    10 months ago
    I take it this pattern isn't supported for the
    address
    parameter of
    DaskExecutor
    ?
    Michael Adkins

    Michael Adkins

    10 months ago
    Nope.
    I think you could hack it, but it'd be pretty goofy.
    a

    An Hoang

    8 months ago
    Hey just following up on this. It's not working for me. The logs in cluster creation prints and I can access the dashboard, see the scheduler but there's no workers starting up even after I wait for a long time. I even added
    cluster.scale(n_workers)
    inside
    dynamic_LSF_dask_executor
    but it also didn't work The same configurations work when I create a Dask cluster separately and then provide address at flow-creation time. Any clue as to why? What I have for the Dask executor:
    #export
    import dask
    import dask_jobqueue
    print(f"{dask.__version__=}")
    print(f"{dask_jobqueue.__version__=}")
    
    #export
    from dask_jobqueue import LSFCluster
    import prefect
    from prefect.executors import DaskExecutor
    
    #export
    
    DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE = 45
    DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER = 5
    DEFAULT_QUEUE = "all_corradin"
    DEFAULT_MAXIMUM_WORKERS = 47 * 17
    
    #export
    
    def dynamic_LSF_dask_executor():
        #cores_per_machine = 45
        cores_per_machine = prefect.context.parameters["cores_per_machine"]
        
        #processes_per_worker = 5
        processes_per_worker = prefect.context.parameters["processes_per_worker"]
        
        queue = prefect.context.parameters["queue"]
        
        n_workers = prefect.context.parameters["n_workers"]
        
        workers_per_machine = round(cores_per_machine/processes_per_worker)
    
        cluster = LSFCluster(queue=queue,
                                 cores= cores_per_machine,
                                 processes = workers_per_machine,
                                 n_workers = n_workers,
                                 walltime='9000:00',
                                 memory='500GB',
                                 local_directory = "/tmp",
                                 job_extra=['-o /dev/null', '-e /dev/null'],
                                 )
        logging = prefect.context.get("logger")
        logging.debug(f"Dask cluster started")
        logging.debug(f"\n{cores_per_machine=}\n{processes_per_worker=}\n{queue=}\n{n_workers=}\n")
        logging.debug(f"Go to this link after running the above to see the dashboard {cluster.dashboard_link}")
        return cluster
    
    
    LSF_dask_executor = DaskExecutor(cluster_class = dynamic_LSF_dask_executor, adapt_kwargs = {"maximum" : DEFAULT_MAXIMUM_WORKERS})
    In my flow:
    with Flow("OVP_step2", executor =  LSF_dask_executor) as step2_flow:
        #other params...
        ....
        #parameters used in dynamic executor functions
        step2_flow.add_task(Parameter("cores_per_machine", default = DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE))
        step2_flow.add_task(Parameter("processes_per_worker", default = DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER))
        step2_flow.add_task(Parameter("n_workers", default = DEFAULT_MAXIMUM_WORKERS))
        step2_flow.add_task(Parameter("queue", default = DEFAULT_QUEUE))
    Kevin Kho

    Kevin Kho

    8 months ago
    How many workers are you adding first before the adapt?
    a

    An Hoang

    8 months ago
    n_workers
    in the
    LSFCluster
    creation is 799. I don't see any
    dask-worker
    jobs submitted in my LSF HPC like it would normally do either
    Kevin Kho

    Kevin Kho

    8 months ago
    I can’t see any reason why it would be different, but it’s also the first time someone came back to be about an HPC cluster. Do you just start it with:
    cluster = LSFCluster(...)
    when you do it yourself and pass the address?
    a

    An Hoang

    8 months ago
    yep! basically
    cluster = dynamic_LSF_dask_executor(); cluster.scale(799)
    Kevin Kho

    Kevin Kho

    8 months ago
    It’s hard for me to isolate if it’s because of the LSF cluster or because of the callable. Let me verify the parameter approach works later. Do you Prefect logs show anything? Do you hit this line?
    a

    An Hoang

    8 months ago
    Yep, I do
    When I remove the
    adaptive_kwargs
    , I can see my
    Parameters
    tasks on the dashboard now, before it was empty
    Kevin Kho

    Kevin Kho

    8 months ago
    Ah i see. Then I guess just directly use cluster.scale?
    a

    An Hoang

    8 months ago
    yay it works now! I didn't have to use
    cluster.scale
    from within the callable. Thank you so much for the help Kevin! Especially at this hour!!!
    Kevin Kho

    Kevin Kho

    8 months ago
    Of course! I am happy to finally have proof HPC Clusters will work as a Dask Executor.
    a

    An Hoang

    8 months ago
    ~Oh wait maybe I spoke too soon. So now all the tasks are completed on the dask dashboard but the
    flow.run()
    is stuck forever... This never happened with the premade Dask cluster. Problem for tomorrow....~😅
    Kevin Kho

    Kevin Kho

    8 months ago
    Ah ok
    a

    An Hoang

    8 months ago
    Nevermind! Double speak too soon. It took 15 mins-ish from the tasks being done on the Dask dashboard to when 
    flow.run()
     being done
    Kevin Kho

    Kevin Kho

    8 months ago
    Ah lol