Hi everyone, I'm working on dynamically passing th...
# ask-community
n
Hi everyone, I'm working on dynamically passing the Dask address to a flow in order to manage execution more flexibly. The goal is to potentially use different clusters or the same external address and scale execution efficiently. I experimented with a few approaches, but I ran into some issues: Attempted Approaches: 1. Passing a function to the
@flow
decorator:
Copy code
@flow(
    task_runner=get_dask_task_runner,
)
def get_dask_task_runner(address=None):
    if address:
        return DaskTaskRunner(address=address)
    else:
        return DaskTaskRunner()
This approach raised an error:
Copy code
task_runner = stack.enter_context(self.flow.task_runner.duplicate())
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'duplicate'
2. Calling the function directly: If I call the function directly within the flow, I can't pass parameters dynamically.
Copy code
@flow(task_runner=create_task_runner())
3. Manual handling inside the flow:
Copy code
@flow(validate_parameters=False)
def my_flow(n: int, dask_address: str = None):
    task_runner = get_task_runner(dask_address)
    with task_runner:
        result = my_task.submit(n)
        return result.result()
1. This works but feels cumbersome and not as integrated as I'd like. Code to Reproduce:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

def get_task_runner(dask_address: str = None):
    return DaskTaskRunner(address=dask_address) if dask_address else DaskTaskRunner()

@task
def my_task(x):
    return x * 2

@flow(validate_parameters=False)
def my_flow(n: int, dask_address: str = None):
    task_runner = get_task_runner(dask_address)
    with task_runner:
        result = my_task.submit(n)
        return result.result()

if __name__ == "__main__":
    # Run with local Dask cluster
    # print(my_flow(5))

    # Run with remote Dask cluster
    print(my_flow(5, dask_address=r"<tcp://127.0.0.1:8786>"))
Question: Is there a better way to directly pass a dynamic Dask address to the
@flow
decorator itself for cleaner and more seamless execution? Any suggestions or best practices would be greatly appreciated! Thanks in advance for any help or insights! Currently using prefect 3
n
hi @n3ura - do you know about
with_options
?
@Marvin please give an example of with_options to dynamically set a task runner for instance
m
To dynamically set a task runner using
with_options
in Prefect, it seems the concept of task runners isn't explicitly tied to
with_options
. Instead, task runners are configured at a flow level or through configuration profiles. But worry not, let's first see how we can set up task runners generally and touch on dynamic settings. ### Prefect 3.x: In Prefect 3.x, you configure the task runner at the flow level. You might use something like
ThreadPoolTaskRunner
or
DaskTaskRunner
. Here’s an example:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def my_task():
    return "Hello, Prefect!"

@flow(task_runner=ThreadPoolTaskRunner)
def my_flow():
    my_task()

my_flow.run()
For dynamic selection at runtime, depending on some condition, you might not directly use
with_options
for this. Instead, it might involve logic before the flow execution:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, DaskTaskRunner

@task
def my_task():
    return "Hello, dynamic task runner!"

def determine_task_runner():
    # Logic to determine which task runner to use
    return ThreadPoolTaskRunner()  # Or DaskTaskRunner if conditions match

@flow(task_runner=determine_task_runner())
def my_flow():
    my_task()
  
my_flow.run()
### Prefect 2.x: You might use
with_options
to set other parameters like retries, tags, etc., dynamically at task level rather than directly for task runners, as there are no direct examples linking
with_options
to task runners in Prefect 2.x documentation. For more details, you can visit Prefect documentation on task runners and about settings and profiles. Should you see a particular spot where you would like
with_options
used in a totally different way unexplored thus far, just let me know, and I’ll dig deeper, just like an android excavating for hidden truths in the vast cosmos.
n
well that didnt work
Copy code
new_flow = old_flow.with_options(task_runner=DaskTaskRunner(...))
same goes for tasks
n
oh yes I tried but sometimes I get AttributeError: 'function' object has no attribute 'with_options'
n
oh yes I tried but sometimes I get
hmmm all task and flow instances have this method so if you saw that error you probably were calling the method on an undecorated, normal function
n
I was calling a method with multiple decorators above. Removing them seems working but i'll make other tests.
👍 1
a
@Nate @n3ura did you ever figure this out? I am stuck on the same, using a much older version though -
2.15.0
I tried writing a custom decorator to wrap the flow function and apply the task runner based on a flow param, but it seems that task runners are not implemented as context managers in this version of prefect, so I can't do
with task_runner:
for example Code to reproduce below - this results in
AttributeError: __enter__
ahhh i actually just figured this out using
.with_options()
Inside the wrapper function, we can just do
Copy code
new_flow_func = flow_func.with_options(task_runner=task_runner)

        return new_flow_func(*args, **kwargs)
instead of using a context manager Then everything works as described in the docstring for my decorator function above. Hope this can be of use to some others looking to do the same!
🙏 1
Coming back to this - I assume this type of dynamic task runner setting behavior wouldnt be possible if I'm using deployments and not executing flows as scripts? Because I assume the task runner is "baked in" to the flow object definition once the deployment is created, regardless of whether it was initially set via
@flow
decorator or
.with_options()
For reference, I'm using
Deployment.build_from_flow(flow_obj)
to create deployments I'm thinking my only option is to create multiple deployments explicitly with different task runners, but open to hear any other ideas
n
hi @Alex Caruso would you be willing to create a discussion about this with an example for discoverability's sake?
a