Thread
#prefect-community
    a

    alex

    2 years ago
    Has anyone else run into an issue with duplicate tasks being created? I have a list of Task instances that I would like to run in parallel.
    with Flow("My flow", schedule=schedule) as flow:
            for source in all_sources:
                source.bind(
                    ## parameters for my run() function here
                )
            res = [source() for feed in all_sources]
    In the schematic and gantt chart I have a dag with all the
    sources
    leading to a list, which is expected but I also duplicated
    source
    tasks without any edges
    Zachary Hughes

    Zachary Hughes

    2 years ago
    Hi @alex, could you share your task definitions? Even if they're stubbed out for privacy, would be helpful in figuring out what's up.
    a

    alex

    2 years ago
    Hi @Zachary Hughes, removing the
    bind()
    call resolved that issue, I think maybe I was just using it incorrectly?
    Zachary Hughes

    Zachary Hughes

    2 years ago
    Hmm, I'm not immediately sure. Is this the entirety of the flow? Using
    bind
    creates a keyed edge-- I'm wondering if there's some peculiarity in visualization between keyed edges and non-keyed edges.
    a

    alex

    2 years ago
    Yup that's the entire flow, my class/task looks something like this.
    class DerivedMeta(SignatureValidator, type(ABC)):
        pass
    
    
    class BaseSource(Task, metaclass=DerivedMeta):
        # has run method w custom parameters 
    
    class DerivedClasses(BaseSource):
         # no run implementation, uses parent's
    all_sources
    is a list[DerivedClasses]
    Zachary Hughes

    Zachary Hughes

    2 years ago
    Ah, you know what-- I think I know what's going on at second glance. By calling bind, you create edges like you'd expect to see. But when you then call
    res = [source() for feed in all_sources]
    , you create a separate set of tasks without edges. That would explain the duplicate set of tasks you were seeing.
    a

    alex

    2 years ago
    Makes sense, thanks! I'm not sure I really understand
    bind
    , what would the best way to achieve what I am trying to do be? Would just using
    res = [source(**run_kwargs) for feed in all_sources]
    instead of bind() be ok?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Yeah, usually you want to call tasks like functions -
    bind
    can be useful, but we recommend using the functional api when possible.