https://prefect.io logo
Title
a

alex

07/07/2020, 4:27 PM
Has anyone else run into an issue with duplicate tasks being created? I have a list of Task instances that I would like to run in parallel.
with Flow("My flow", schedule=schedule) as flow:
        for source in all_sources:
            source.bind(
                ## parameters for my run() function here
            )
        res = [source() for feed in all_sources]
In the schematic and gantt chart I have a dag with all the
sources
leading to a list, which is expected but I also duplicated
source
tasks without any edges
z

Zachary Hughes

07/07/2020, 4:33 PM
Hi @alex, could you share your task definitions? Even if they're stubbed out for privacy, would be helpful in figuring out what's up.
a

alex

07/07/2020, 4:40 PM
Hi @Zachary Hughes, removing the
bind()
call resolved that issue, I think maybe I was just using it incorrectly?
z

Zachary Hughes

07/07/2020, 4:59 PM
Hmm, I'm not immediately sure. Is this the entirety of the flow? Using
bind
creates a keyed edge-- I'm wondering if there's some peculiarity in visualization between keyed edges and non-keyed edges.
a

alex

07/07/2020, 5:29 PM
Yup that's the entire flow, my class/task looks something like this.
class DerivedMeta(SignatureValidator, type(ABC)):
    pass


class BaseSource(Task, metaclass=DerivedMeta):
    # has run method w custom parameters 

class DerivedClasses(BaseSource):
     # no run implementation, uses parent's
all_sources
is a list[DerivedClasses]
z

Zachary Hughes

07/07/2020, 6:16 PM
Ah, you know what-- I think I know what's going on at second glance. By calling bind, you create edges like you'd expect to see. But when you then call
res = [source() for feed in all_sources]
, you create a separate set of tasks without edges. That would explain the duplicate set of tasks you were seeing.
:upvote: 2
a

alex

07/07/2020, 7:11 PM
Makes sense, thanks! I'm not sure I really understand
bind
, what would the best way to achieve what I am trying to do be? Would just using
res = [source(**run_kwargs) for feed in all_sources]
instead of bind() be ok?
j

Jim Crist-Harif

07/07/2020, 7:34 PM
Yeah, usually you want to call tasks like functions -
bind
can be useful, but we recommend using the functional api when possible.
🙏 1