n3ura
11/15/2024, 4:35 PM@flow
decorator:
@flow(
task_runner=get_dask_task_runner,
)
def get_dask_task_runner(address=None):
if address:
return DaskTaskRunner(address=address)
else:
return DaskTaskRunner()
This approach raised an error:
task_runner = stack.enter_context(self.flow.task_runner.duplicate())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'duplicate'
2. Calling the function directly:
If I call the function directly within the flow, I can't pass parameters dynamically.
@flow(task_runner=create_task_runner())
3. Manual handling inside the flow:
@flow(validate_parameters=False)
def my_flow(n: int, dask_address: str = None):
task_runner = get_task_runner(dask_address)
with task_runner:
result = my_task.submit(n)
return result.result()
1. This works but feels cumbersome and not as integrated as I'd like.
Code to Reproduce:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
def get_task_runner(dask_address: str = None):
return DaskTaskRunner(address=dask_address) if dask_address else DaskTaskRunner()
@task
def my_task(x):
return x * 2
@flow(validate_parameters=False)
def my_flow(n: int, dask_address: str = None):
task_runner = get_task_runner(dask_address)
with task_runner:
result = my_task.submit(n)
return result.result()
if __name__ == "__main__":
# Run with local Dask cluster
# print(my_flow(5))
# Run with remote Dask cluster
print(my_flow(5, dask_address=r"<tcp://127.0.0.1:8786>"))
Question:
Is there a better way to directly pass a dynamic Dask address to the @flow
decorator itself for cleaner and more seamless execution? Any suggestions or best practices would be greatly appreciated!
Thanks in advance for any help or insights!
Currently using prefect 3Nate
11/15/2024, 4:43 PMwith_options
?Nate
11/15/2024, 4:43 PMMarvin
11/15/2024, 4:44 PMwith_options
in Prefect, it seems the concept of task runners isn't explicitly tied to with_options
. Instead, task runners are configured at a flow level or through configuration profiles. But worry not, let's first see how we can set up task runners generally and touch on dynamic settings.
### Prefect 3.x:
In Prefect 3.x, you configure the task runner at the flow level. You might use something like ThreadPoolTaskRunner
or DaskTaskRunner
. Here’s an example:
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def my_task():
return "Hello, Prefect!"
@flow(task_runner=ThreadPoolTaskRunner)
def my_flow():
my_task()
my_flow.run()
For dynamic selection at runtime, depending on some condition, you might not directly use with_options
for this. Instead, it might involve logic before the flow execution:
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, DaskTaskRunner
@task
def my_task():
return "Hello, dynamic task runner!"
def determine_task_runner():
# Logic to determine which task runner to use
return ThreadPoolTaskRunner() # Or DaskTaskRunner if conditions match
@flow(task_runner=determine_task_runner())
def my_flow():
my_task()
my_flow.run()
### Prefect 2.x:
You might use with_options
to set other parameters like retries, tags, etc., dynamically at task level rather than directly for task runners, as there are no direct examples linking with_options
to task runners in Prefect 2.x documentation.
For more details, you can visit Prefect documentation on task runners and about settings and profiles.
Should you see a particular spot where you would like with_options
used in a totally different way unexplored thus far, just let me know, and I’ll dig deeper, just like an android excavating for hidden truths in the vast cosmos.Nate
11/15/2024, 4:44 PMnew_flow = old_flow.with_options(task_runner=DaskTaskRunner(...))
same goes for tasksn3ura
11/15/2024, 5:35 PMNate
11/15/2024, 5:35 PMoh yes I tried but sometimes I gethmmm all task and flow instances have this method so if you saw that error you probably were calling the method on an undecorated, normal function
n3ura
11/15/2024, 5:45 PMAlex Caruso
12/09/2024, 11:06 PM2.15.0
I tried writing a custom decorator to wrap the flow function and apply the task runner based on a flow param, but it seems that task runners are not implemented as context managers in this version of prefect, so I can't do with task_runner:
for example
Code to reproduce below - this results in AttributeError: __enter__
Alex Caruso
12/09/2024, 11:19 PM.with_options()
Inside the wrapper function, we can just do
new_flow_func = flow_func.with_options(task_runner=task_runner)
return new_flow_func(*args, **kwargs)
instead of using a context manager
Then everything works as described in the docstring for my decorator function above. Hope this can be of use to some others looking to do the same!Alex Caruso
12/10/2024, 3:34 AM@flow
decorator or .with_options()
For reference, I'm using Deployment.build_from_flow(flow_obj)
to create deployments
I'm thinking my only option is to create multiple deployments explicitly with different task runners, but open to hear any other ideasNate
12/10/2024, 2:46 PMAlex Caruso
12/10/2024, 3:17 PM