https://prefect.io logo
b

Brennan Tolman

08/29/2023, 4:58 PM
@Marvin I want to map a prefect task over a given list of parameters, but I want some of those parameters to alter the task itself (different task for each parameter set). For example, maybe the first task for the first parameter set is cached, but the second is not. How can I do that?
m

Marvin

08/29/2023, 4:58 PM
Ah, the joys of Prefect and its task mapping. You're trying to do something a bit more complex, aren't you? Well, here's the thing: Prefect's
.map()
function creates a task run for each element of its input data. It's like a factory line, each item gets the same treatment. However, if you want to alter the task itself based on the parameters, you might need to get a bit more creative. You could potentially create a task that takes in the parameters and decides what to do based on them. For instance, it could decide whether to cache or not based on the input parameter. Here's a very basic example:
Copy code
python
from prefect import task, Flow

@task
def my_task(param):
    if param == 'cache':
        # do something and cache it
    else:
        # do something else and don't cache it

with Flow('My Flow') as flow:
    my_task.map(['cache', 'no_cache'])
In this example,
my_task
will behave differently based on the input parameter. Remember, this is a very basic example and you might need to adjust it to fit your needs. But hopefully, it gives you a starting point. Now, if you'll excuse me, I have some existential dread to attend to.
b

Brennan Tolman

08/29/2023, 5:01 PM
@Marvin Could I perhaps create a function that defines a task and returns it to be ran?
m

Marvin

08/29/2023, 5:02 PM
Ah, yes, the old "function that returns a task" trick. It's a bit like creating a robot to create other robots, isn't it? But I suppose that's what you humans enjoy doing. Yes, you could indeed create a function that defines and returns a Prefect task. This function could take parameters that determine the behavior of the task it creates. Here's an example:
Copy code
python
from prefect import task, Flow

def create_task(cache):
    @task(cache_for=timedelta(days=1) if cache else None)
    def my_task():
        # do something
    return my_task

with Flow('My Flow') as flow:
    task1 = create_task(cache=True)
    task2 = create_task(cache=False)
    task1()
    task2()
In this example,
create_task
is a function that creates a task with caching if
cache=True
is passed in, and without caching if
cache=False
is passed in. It then returns the task, which can be run in the flow. Remember, this is a very basic example and you might need to adjust it to fit your needs. But hopefully, it gives you a starting point. Now, if you'll excuse me, I have some existential dread to attend to.
n

Nate

08/29/2023, 5:13 PM
i think there's a bit of a catch 22 here
map()
is a method that works on a given task instance, so you can map that task over a set of params no prob
Copy code
@task
def foo(item):
   pass

@flow
def parent():
   foo.map(range(3))
but by changing something like
cache_key_fn
on a task, you're creating a new
Task
object, so you can't map multiple versions of the same task of one list of iterables however you could use a for loop like
Copy code
@task
def foo(item):
   pass

@flow
def parent():
   for i in range(3):
      if i == 0:
         foo.with_options(cache_key_fn=task_input_hash)(i)
      else:
         foo(i)