Hi all! Trying to construct some simple pipeline. ...
# prefect-community
a
Hi all! Trying to construct some simple pipeline. One of tasks return dict, that I want to unpack in another one:
Copy code
with Flow("Data collection") as flow:
    params = task(params)
    another_task(**params)
but python raise an error
Copy code
TypeError: FunctionTask object argument after ** must be a mapping, not FunctionTask
Is there any way to unpack dict from task?
oh Looks like i've got it For unpacking objects class has to have getitem and keys methods, so as far as getitem already defined we have to bind to Task some realization of keys I'll try to do it by myself, but if someone already done it please share it
j
@Artem Andrienko, when you call
task(params)
inside your
with Flow
block, you are not actually calling your task function quite yet. Instead, Prefect is observing that call and building up a computational graph that will be executed later. Therefore, the resulting variable that you called
params
is not actually a dictionary, but a Prefect
Task
object; the dictionary will not be produced until you actually call
flow.run()
. What you should probably do in your flow definition is pass the entire
params
variable to
another_task
like this:
another_task(params)
and unpack the dictionary inside the task, where you can provide any custom logic. When the flow is run, Prefect will collect the dictionary output from
task
and pass it to
another_task
appropriately.
More generally: you can only work with the “realized” result of a task from inside another task, not while building the flow.
i
@Artem Andrienko a task can return a dict. see if this example helps you