Christopher Harris
04/21/2020, 10:15 PMclass BasePipeline:
"""
A series of services that mutate a document object.
The flow consists of a single 'source',
followed by x number of 'processors', followed by a 'sink.'
The document object is passed along these services in a linear fashion.
A pipeline configuration object is used to define the type of services
and the order of the processors.
"""
def __init__(self, blueprints: "PipelineConfiguration"):
"""
Initializes and configures services from PipelineConfiguration
blueprints.
:param blueprints: a pipeline configuration object
"""
self.project = blueprints.project
self.source: Source = start_source(blueprints.source)
self.sinks: List[Sink] = [start_sink(sink_blueprint) for sink_blueprint in blueprints.sinks]
self.processors: List[Processor] = [start_processor(processor_blueprint) for processor_blueprint in blueprints.processors]
def __call__(self, metrics: bool = False, failfast: bool = False) -> None:
"""
Passes a document through the pipeline.
"""
for doc in self.source.pull():
for processor in self.processors:
processor(doc)
for sink in self.sinks:
sink.push(doc)
And here is my translation attempt:
@task
def init_source(project: str, source_config: Blueprint):
return start_source(project, source_config)
@task
def init_sink(project: str, sink_config: Blueprint):
return start_sink(project, sink_config)
@task
def init_processor(project: str, processor_config: Blueprint):
return start_processor(project, processor_config)
@task
def run_source(source: Source):
return source.pull()
@task
def run_sink(sink: Sink, data: Document):
return sink.push(data)
@task
def run_processor(processor: Processor, data: Document):
return processor(data)
def execute(pipeline_config: PipelineConfiguration):
project = pipeline_config.project
with Flow("test-flow") as flow:
# Service Initialization
source = init_sink(project, pipeline_config.source)
processors = [init_processor(project, processor_config) for processor_config in pipeline_config.processors]
sinks = [init_sink(project, sink_config) for sink_config in pipeline_config.sinks]
# Perform ETL
datastream = run_source(source)
for doc in datastream:
for processor in processors:
doc = run_processor(processor, doc)
for sink in sinks:
run_sink(sink, doc)
flow.run()
The issue i'm running into is it is saying datastream is not iterable. While this may be a syntax issue on my end, the output of source.pull() is a generator. Are generators supported in prefect? If not - does anyone have an optimal workaround?Zachary Hughes
04/21/2020, 10:16 PMsource.pull()
, you could map over the values held in datastream
. Does that sound like it would work for your use case?
https://docs.prefect.io/core/concepts/mapping.html#simple-mappingChristopher Harris
04/24/2020, 8:20 PM