Lukáš Pravda
08/08/2022, 4:25 PMList[int]
for simplicity here)
@task
def test_task(lst: List[int]):
log = prefect.context.logger
val = sum(lst)
<http://log.info|log.info>(val)
return val
What I’d like to do is to run a mapped task over, but I’m not entirely sure how to invoke it using unmapped
(if this is even possible). When I call it like:
with Flow("test flow") as flow:
p = Parameter("p", default=[[1, 2, 3], [2, 3, 4]])
test_task.map(p)
it works as expected and the test_task
is called with:
test_task([1,2,3])
test_task([2,3,4])
I’d like to achieve the following to be run, but with the use of unmapped task:
test_task(10, 1, 2)
test_task(20, 1, 2)
test_task(30, 1, 2)
so I tried
with Flow("test flow") as flow:
p = Parameter("p", default=[10, 20, 30])
test_task.map([p, unmapped(1), unmapped(2)])
this one fails with int not being iterable
exception, while if I remove the array like this:
with Flow("test flow") as flow:
p = Parameter("p", default=[10, 20, 30])
test_task.map(p, unmapped(1), unmapped(2))
I’m getting “too many positional arguments error”
I dont know how better should I put it to get what I need. Other solution is to come up with a task per array size, but that is just a lot. Any advice? Thank you very much for your help!Nate
08/08/2022, 5:30 PMtest_task
so that it expects an object with the type of the elements of that iterable
and also if you're mapping, you wouldn't be able to interact with the entire list within the context of a task, but the result of that mapped task will be a list that you can use downstream like
from prefect import Flow, Parameter, task, unmapped
@task(log_stdout=True)
def test_task(val: int, arg1: str):
print(f'my mapped arg: {val}')
print(f'my unmapped arg: {arg1}')
return val
@task
def some_task_expecting_a(list):
# do things
pass
with Flow("test flow") as flow:
p = Parameter("p", default=[10, 20, 30])
list_result = test_task.map(p, arg1=unmapped('1'))
some_task_expecting_a(list_result)
if __name__ == "__main__":
flow.run()
Lukáš Pravda
08/09/2022, 10:19 AM