https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Christopher Harris

04/24/2020, 8:40 PM
Question #2: Here is an example of a flow I’m using: The idea is we pull in a list of “documents” from a single source and we want to push each document to every sink.
Copy code
def execute(pipeline_config: PipelineConfiguration):
    project = pipeline_config.project
    with Flow("test-flow") as flow:
        # Service Initialization
        source = init_source(project, pipeline_config.
        sinks = init_sink.map(unmapped(project), pipeline_config.sinks)

        # Perform ETL
        documents = pull(source)
        push.map(documents, sinks)

    flow.run(executor=LocalExecutor())
The problem with this approach is it does a one to one mapping - like the first image. I want a many to one mapping, like the second image. Effectively i am trying to recreate the following logic
Copy code
for each document:
    for each sink:
        sink.push(doc)
j

Jenny

04/24/2020, 8:58 PM
Hi @Christopher Harris - thanks for the question! Let me check that for you!
😁 1
@Christopher Harris How about:
Copy code
push.map(documents, unmapped(sinks))
And then within the push task add
Copy code
for sink in sinks:
    sink.push(doc)
or vice versa!
c

Christopher Harris

04/24/2020, 9:37 PM
Ha! That’s actually what I was doing prior. Overall it is fine but it would be nice to have each call to
push
be its own task. As it stands each document needs to go through each sink in sequential order (which ideally it shouldn’t). And if one of the sinks fails and we retry - we could have duplicate copies of a document in some of the sinks.
🤔 1
j

Jenny

04/24/2020, 9:47 PM
Ha! Ok. Let me see if there's any other ideas on how we could tackle this one.
e

emre

04/24/2020, 10:01 PM
Have you tried gathering the source list and sink list into a single task to generate list of all source-sink combinations? Many to many mappings in prefect have usually been handled by generating a list of combinations in the past. However this may not be efficient in your case. Lets say
source-sink
is the list of combos. Using
pull.map(source-sink)
will pull each source n times, n being amount of sinks. You could instead first pull all documents, then go for a
doc-sink
combo list. This seems more sane compared to before, but I actually don’t know if it will cause duplicate documents in memory.
j

Jenny

04/24/2020, 10:13 PM
Thanks Emre. @Christopher Harris - can you let us know if that works for you?