Hi folks, I am relatively new to prefect and may h...
# ask-community
p
Hi folks, I am relatively new to prefect and may have a beginner question that I hope you can help me out. I have not been able to answer it with the forum or documentation by myself so far. This is my use case: I want to extract some symbols from my internal database (A) to extract external data from the web for those symbols in the form of a Pandas DataFrame (B). After that I transform and enrich the extracted external data by some information also as a Pandas DataFrame (C). Tasks B and C should be processed in parallel by calling the mapping function. Task A is called by the normal run method. Now I would like to have the results from Task C reduced to a list of DataFrames to load them to disk as one feather file (D). According to the documentation the reduce step is done automatically if I call the run method and the task expects a list. Up to task C, everything works fine. But I’ve some trouble with the reduce step and my custom class for task D. Task D expects a list of DataFrames (as expected from the reduce step) and calls the pandas.concat function to merge the results back into one DataFrame to finally save it to disk as a single file. But if I run the flow with my class, the following error occurs:
Copy code
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
If I got it right, Task D only gets a reference to the previous task object instead of the reduced list of DataFrames. The same logic with the task decorator, on the other hand, works without any problems. There is no difference in both logics, which is why I am quite irritated by the different results. Please, can someone help me out? It's probably just a small thing, but I just can't find it 😕
1
k
Hi @Phil, I can’t tell if this is the issue, but you shouldn’t be calling the
run
method inside the flow. So if you have
Copy code
@task
def abc(..):
    return ..
then you would do:
Copy code
with Flow(...) as flow:
    abc(..)
and for a class if you have:
Copy code
class ABC(Task):
    
    def run(...):
        return ...
it would be:
Copy code
abc_task = ABC(...) # init here
with Flow(...) as flow:
    abc_task(...) # this calls the run
or it would be:
Copy code
with Flow(...) as flow:
    ABC(init_here)(run_here)
🙏 1
The Flow context manager calls the run for you automatically. The
map
is correct though
Also, I think you don’t need to make the
init
like that for your class because you will inherit the
Task
init
which does the same thing and has the same keywords
p
Hey Kevin, thanks for your fast response. I really appreciate it and see what you mean. I'll try it out tomorrow and then report back again
k
Sounds good just let us know. Responses will be a bit slow as we have a holiday tom (election day), but will be around
p
You were absolutely right. Everything works fine as soon as I removed the run method in the flow definition. It works like a charm now. Thank you very much for your help and have a nice holiday!
👍 1