Hi guys! Beginner to prefect here - very much exci...
# prefect-community
Hi guys! Beginner to prefect here - very much excited to get up and running on it! I am currently trying to translate our current, very basic/limited pipeline over to prefect - after which we will iterate on it and take advantage of prefect's offerings. Here is our current pipeline - with several extraneous stuff removed:
Copy code
class BasePipeline:
    A series of services that mutate a document object.
    The flow consists of a single 'source',
    followed by x number of 'processors', followed by a 'sink.'
    The document object is passed along these services in a linear fashion.
    A pipeline configuration object is used to define the type of services
    and the order of the processors.

    def __init__(self, blueprints: "PipelineConfiguration"):
        Initializes and configures services from PipelineConfiguration
        :param blueprints: a pipeline configuration object
        self.project = blueprints.project
        self.source: Source = start_source(blueprints.source)
        self.sinks: List[Sink] = [start_sink(sink_blueprint) for sink_blueprint in blueprints.sinks]
        self.processors: List[Processor] = [start_processor(processor_blueprint) for processor_blueprint in blueprints.processors]

    def __call__(self, metrics: bool = False, failfast: bool = False) -> None:
        Passes a document through the pipeline.

        for doc in self.source.pull():
            for processor in self.processors:
            for sink in self.sinks:
And here is my translation attempt:
Copy code
def init_source(project: str, source_config: Blueprint):
    return start_source(project, source_config)

def init_sink(project: str, sink_config: Blueprint):
    return start_sink(project, sink_config)

def init_processor(project: str, processor_config: Blueprint):
    return start_processor(project, processor_config)

def run_source(source: Source):
    return source.pull()

def run_sink(sink: Sink, data: Document):
    return sink.push(data)

def run_processor(processor: Processor, data: Document):
    return processor(data)

def execute(pipeline_config: PipelineConfiguration):
    project = pipeline_config.project
    with Flow("test-flow") as flow:
        # Service Initialization
        source = init_sink(project, pipeline_config.source)
        processors = [init_processor(project, processor_config) for processor_config in pipeline_config.processors]
        sinks = [init_sink(project, sink_config) for sink_config in pipeline_config.sinks]

        # Perform ETL
        datastream = run_source(source)
        for doc in datastream:
            for processor in processors:
                doc = run_processor(processor, doc)
            for sink in sinks:
                run_sink(sink, doc)

The issue i'm running into is it is saying datastream is not iterable. While this may be a syntax issue on my end, the output of source.pull() is a generator. Are generators supported in prefect? If not - does anyone have an optimal workaround?
👋 1
🎉 1
Hi @Christopher Harris, welcome to Prefect! Taking a look at your question now. 🙂
👍 1
Because each task needs to finish before the downstream tasks kick off, we don't currently support tasks returning generators. As far as workarounds go, once you do have the results of
, you could map over the values held in
. Does that sound like it would work for your use case? https://docs.prefect.io/core/concepts/mapping.html#simple-mapping
For now that is a workaround we are considering. Thanks for the advice!