Thread
#prefect-community
    p

    Phil

    10 months ago
    Hi folks, I am relatively new to prefect and may have a beginner question that I hope you can help me out. I have not been able to answer it with the forum or documentation by myself so far. This is my use case: I want to extract some symbols from my internal database (A) to extract external data from the web for those symbols in the form of a Pandas DataFrame (B). After that I transform and enrich the extracted external data by some information also as a Pandas DataFrame (C). Tasks B and C should be processed in parallel by calling the mapping function. Task A is called by the normal run method. Now I would like to have the results from Task C reduced to a list of DataFrames to load them to disk as one feather file (D). According to the documentation the reduce step is done automatically if I call the run method and the task expects a list. Up to task C, everything works fine. But I’ve some trouble with the reduce step and my custom class for task D. Task D expects a list of DataFrames (as expected from the reduce step) and calls the pandas.concat function to merge the results back into one DataFrame to finally save it to disk as a single file. But if I run the flow with my class, the following error occurs:
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    If I got it right, Task D only gets a reference to the previous task object instead of the reduced list of DataFrames. The same logic with the task decorator, on the other hand, works without any problems. There is no difference in both logics, which is why I am quite irritated by the different results. Please, can someone help me out? It's probably just a small thing, but I just can't find it 😕
    Kevin Kho

    Kevin Kho

    10 months ago
    Hi @Phil, I can’t tell if this is the issue, but you shouldn’t be calling the
    run
    method inside the flow. So if you have
    @task
    def abc(..):
        return ..
    then you would do:
    with Flow(...) as flow:
        abc(..)
    and for a class if you have:
    class ABC(Task):
        
        def run(...):
            return ...
    it would be:
    abc_task = ABC(...) # init here
    with Flow(...) as flow:
        abc_task(...) # this calls the run
    or it would be:
    with Flow(...) as flow:
        ABC(init_here)(run_here)
    The Flow context manager calls the run for you automatically. The
    map
    is correct though
    Also, I think you don’t need to make the
    init
    like that for your class because you will inherit the
    Task
    init
    which does the same thing and has the same keywords
    p

    Phil

    10 months ago
    Hey Kevin, thanks for your fast response. I really appreciate it and see what you mean. I'll try it out tomorrow and then report back again
    Kevin Kho

    Kevin Kho

    10 months ago
    Sounds good just let us know. Responses will be a bit slow as we have a holiday tom (election day), but will be around
    p

    Phil

    10 months ago
    You were absolutely right. Everything works fine as soon as I removed the run method in the flow definition. It works like a charm now. Thank you very much for your help and have a nice holiday!