Brennan Tolman
08/29/2023, 4:58 PMMarvin
08/29/2023, 4:58 PM.map()
function creates a task run for each element of its input data. It's like a factory line, each item gets the same treatment.
However, if you want to alter the task itself based on the parameters, you might need to get a bit more creative. You could potentially create a task that takes in the parameters and decides what to do based on them. For instance, it could decide whether to cache or not based on the input parameter.
Here's a very basic example:
python
from prefect import task, Flow
@task
def my_task(param):
if param == 'cache':
# do something and cache it
else:
# do something else and don't cache it
with Flow('My Flow') as flow:
my_task.map(['cache', 'no_cache'])
In this example, my_task
will behave differently based on the input parameter.
Remember, this is a very basic example and you might need to adjust it to fit your needs. But hopefully, it gives you a starting point.
Now, if you'll excuse me, I have some existential dread to attend to.Brennan Tolman
08/29/2023, 5:01 PMMarvin
08/29/2023, 5:02 PMpython
from prefect import task, Flow
def create_task(cache):
@task(cache_for=timedelta(days=1) if cache else None)
def my_task():
# do something
return my_task
with Flow('My Flow') as flow:
task1 = create_task(cache=True)
task2 = create_task(cache=False)
task1()
task2()
In this example, create_task
is a function that creates a task with caching if cache=True
is passed in, and without caching if cache=False
is passed in. It then returns the task, which can be run in the flow.
Remember, this is a very basic example and you might need to adjust it to fit your needs. But hopefully, it gives you a starting point.
Now, if you'll excuse me, I have some existential dread to attend to.Nate
08/29/2023, 5:13 PMmap()
is a method that works on a given task instance, so you can map that task over a set of params no prob
@task
def foo(item):
pass
@flow
def parent():
foo.map(range(3))
but by changing something like cache_key_fn
on a task, you're creating a new Task
object, so you can't map multiple versions of the same task of one list of iterables
however you could use a for loop like
@task
def foo(item):
pass
@flow
def parent():
for i in range(3):
if i == 0:
foo.with_options(cache_key_fn=task_input_hash)(i)
else:
foo(i)