Hi folks, if I return a dictionary from a task, is there a way to unpack that result into
kwargs
for the next task downstream?
Something like;
Copy code
# returns {"path": "some/restapi/path", "start_time": datetime...}
tasks = generate_tasks(first_set)
# Needs a path and start_time argument
histories = fetch_history.map(**tasks)
But this gives me the error
expression after ** must be a mapping with a "str" key
k
Kevin Kho
08/04/2021, 2:43 AM
Hey @Aiden Price, the quick answer is no and you'd need to pass the whole dictionary to the downstream task.
The long answer is that tasks are executed in a deferred fashion. But the list access and dictionary access like
x[0]
are evaluated during build time, when that
Task
hasn't resolved yet which causes this error. So you can't perform stuff like tuple unpacking because the values don't exist during build time.
You either need an intermediate task to plug out the values, or pass the whole dictionary, but something seems off here. If you perform the
map
, it has to be over a list. If you are supplying a
kwarg
that is the same for all mapped tasks, you mapped need to wrapped it with
unmapped
.
a
Aiden Price
08/04/2021, 2:46 AM
Ah whoops yes I should have removed that
map
call when I simplified my example. In actuality I'm returning a list of dictionaries and wanted to unpack each in the
map
.
k
Kevin Kho
08/04/2021, 2:47 AM
Ah that makes sense. I think unpacking it inside the task would be the right approach here
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.