Christopher Harris04/24/2020, 8:40 PM
The problem with this approach is it does a one to one mapping - like the first image. I want a many to one mapping, like the second image. Effectively i am trying to recreate the following logic
def execute(pipeline_config: PipelineConfiguration): project = pipeline_config.project with Flow("test-flow") as flow: # Service Initialization source = init_source(project, pipeline_config. sinks = init_sink.map(unmapped(project), pipeline_config.sinks) # Perform ETL documents = pull(source) push.map(documents, sinks) flow.run(executor=LocalExecutor())
for each document: for each sink: sink.push(doc)
Jenny04/24/2020, 8:58 PM
for sink in sinks: sink.push(doc)
Christopher Harris04/24/2020, 9:37 PM
be its own task. As it stands each document needs to go through each sink in sequential order (which ideally it shouldn’t). And if one of the sinks fails and we retry - we could have duplicate copies of a document in some of the sinks.
Jenny04/24/2020, 9:47 PM
emre04/24/2020, 10:01 PM
is the list of combos. Using
will pull each source n times, n being amount of sinks. You could instead first pull all documents, then go for a
combo list. This seems more sane compared to before, but I actually don’t know if it will cause duplicate documents in memory.
Jenny04/24/2020, 10:13 PM