I apologize if this has been asked before. Is there any way that I can return a dictionary from one Task and use it as the input kwargs for another task? More specifically I want a task that returns a Dictionary and I want to pass its result to another task using variable unpacking...
dict_result = some_task()
some_other_task(**dict_result)
Is this doable? Is there a more Prefect-y way to accomplish this without creating a Parameter for eahc element of the dictionary? I ultimately want to pass as an input a list of dictionaries which I can then operate over using Prefect's mapping, so being able to define the collection of arguments together is important.
k
Kevin Kho
04/19/2022, 9:20 PM
Hey @Tim Wright, no worries but next time this would be better in the #CL09KU1K7 channel. You can’t do this because dictionary access is evaluated during flow registration time instead of flow execution time but the value does not exist there.
t
Tim Wright
04/19/2022, 9:20 PM
Thanks @Kevin Kho, sorry about the wrong channel.
k
Kevin Kho
04/19/2022, 9:21 PM
So you need a task that pulls the appropriate item so that it happens during runtime, or list of items in the case of mapping
Kevin Kho
04/19/2022, 9:22 PM
The list order is respected anyway when mapping
t
Tim Wright
04/19/2022, 9:26 PM
So basically a task to fetch each argument from the dictionary during runtime and then pass those explicitly to the downstream task?
k
Kevin Kho
04/19/2022, 9:31 PM
Think like this:
Copy code
@task
def get_item(raw: List[Dict], key):
res = []
for r in raw:
res.append(r[key])
return res
with Flow(...) as flow:
items = get_all_inputs()
field1 = get_item(items, "field1")
field2 = get_item(items, "field2")
next_task.map(field1, field2)
🙌 1
t
Tim Wright
04/19/2022, 9:32 PM
Thats way slicker than I was thinking. Thanks @Kevin Kho
k
Kevin Kho
04/19/2022, 9:33 PM
You can compress it too
Copy code
@task(nout=2)
def get_item(raw: List[Dict], key1, key2):
res1 = []
res2 = []
for r in raw:
res1.append(r[key1])
res2.append(r[key2])
return res1, res2
Kevin Kho
04/19/2022, 9:34 PM
Copy code
with Flow(...) as flow:
items = get_all_inputs()
field1, field2 = get_item(items, "field1", "field2")
next_task.map(field1, field2)
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.