https://prefect.io logo
Title
l

Lukáš Pravda

08/08/2022, 4:25 PM
Hello community, I have, what I feel is a rather stupid question for prefect 1.x. I have a simple task, that needs an array as an argument (the list can be of arbitrary length and mixed types in my example, but using
List[int]
for simplicity here)
@task
def test_task(lst: List[int]):
    log = prefect.context.logger
    val = sum(lst)
    <http://log.info|log.info>(val)
    return val
What I’d like to do is to run a mapped task over, but I’m not entirely sure how to invoke it using
unmapped
(if this is even possible). When I call it like:
with Flow("test flow") as flow:
    p = Parameter("p", default=[[1, 2, 3], [2, 3, 4]])
    test_task.map(p)
it works as expected and the
test_task
is called with:
test_task([1,2,3])
test_task([2,3,4])
I’d like to achieve the following to be run, but with the use of unmapped task:
test_task(10, 1, 2)
test_task(20, 1, 2)
test_task(30, 1, 2)
so I tried
with Flow("test flow") as flow:
    p = Parameter("p", default=[10, 20, 30])
    test_task.map([p, unmapped(1), unmapped(2)])
this one fails with
int not being iterable
exception, while if I remove the array like this:
with Flow("test flow") as flow:
    p = Parameter("p", default=[10, 20, 30])
    test_task.map(p, unmapped(1), unmapped(2))
I’m getting “too many positional arguments error” I dont know how better should I put it to get what I need. Other solution is to come up with a task per array size, but that is just a lot. Any advice? Thank you very much for your help!
n

Nate

08/08/2022, 5:30 PM
Hey @Lukáš Pravda if you're mapping a task over an iterable, then you should write your
test_task
so that it expects an object with the type of the elements of that iterable and also if you're mapping, you wouldn't be able to interact with the entire list within the context of a task, but the result of that mapped task will be a list that you can use downstream like
from prefect import Flow, Parameter, task, unmapped

@task(log_stdout=True)
def test_task(val: int, arg1: str):
    print(f'my mapped arg: {val}')
    print(f'my unmapped arg: {arg1}')
    return val

@task
def some_task_expecting_a(list):
    # do things
    pass

with Flow("test flow") as flow:
    p = Parameter("p", default=[10, 20, 30])
    list_result = test_task.map(p, arg1=unmapped('1'))
    
    some_task_expecting_a(list_result)

if __name__ == "__main__":
    flow.run()
l

Lukáš Pravda

08/09/2022, 10:19 AM
Makes sense, I have not realised I can use a ‘dummy’ task to pregenerate the array for the actual task. Thanks @Nate!
👍 1