Christopher Harris
04/24/2020, 8:40 PMdef execute(pipeline_config: PipelineConfiguration):
project = pipeline_config.project
with Flow("test-flow") as flow:
# Service Initialization
source = init_source(project, pipeline_config.
sinks = init_sink.map(unmapped(project), pipeline_config.sinks)
# Perform ETL
documents = pull(source)
push.map(documents, sinks)
flow.run(executor=LocalExecutor())
The problem with this approach is it does a one to one mapping - like the first image.
I want a many to one mapping, like the second image.
Effectively i am trying to recreate the following logic
for each document:
for each sink:
sink.push(doc)
Jenny
04/24/2020, 8:58 PMpush.map(documents, unmapped(sinks))
for sink in sinks:
sink.push(doc)
Christopher Harris
04/24/2020, 9:37 PMpush
be its own task.
As it stands each document needs to go through each sink in sequential order (which ideally it shouldn’t). And if one of the sinks fails and we retry - we could have duplicate copies of a document in some of the sinks.Jenny
04/24/2020, 9:47 PMemre
04/24/2020, 10:01 PMsource-sink
is the list of combos. Using pull.map(source-sink)
will pull each source n times, n being amount of sinks.
You could instead first pull all documents, then go for a doc-sink
combo list. This seems more sane compared to before, but I actually don’t know if it will cause duplicate documents in memory.Jenny
04/24/2020, 10:13 PM