Hi guys! Beginner to prefect here - very much exci...
# prefect-community
c
Hi guys! Beginner to prefect here - very much excited to get up and running on it! I am currently trying to translate our current, very basic/limited pipeline over to prefect - after which we will iterate on it and take advantage of prefect's offerings. Here is our current pipeline - with several extraneous stuff removed:
Copy code
class BasePipeline:
    """
    A series of services that mutate a document object.
    The flow consists of a single 'source',
    followed by x number of 'processors', followed by a 'sink.'
    The document object is passed along these services in a linear fashion.
    A pipeline configuration object is used to define the type of services
    and the order of the processors.
    """

    def __init__(self, blueprints: "PipelineConfiguration"):
        """
        Initializes and configures services from PipelineConfiguration
        blueprints.
        :param blueprints: a pipeline configuration object
        """
        self.project = blueprints.project
        self.source: Source = start_source(blueprints.source)
        self.sinks: List[Sink] = [start_sink(sink_blueprint) for sink_blueprint in blueprints.sinks]
        self.processors: List[Processor] = [start_processor(processor_blueprint) for processor_blueprint in blueprints.processors]

    def __call__(self, metrics: bool = False, failfast: bool = False) -> None:
        """
        Passes a document through the pipeline.
        """

        for doc in self.source.pull():
            for processor in self.processors:
                processor(doc)
            for sink in self.sinks:
                sink.push(doc)
And here is my translation attempt:
Copy code
@task
def init_source(project: str, source_config: Blueprint):
    return start_source(project, source_config)


@task
def init_sink(project: str, sink_config: Blueprint):
    return start_sink(project, sink_config)


@task
def init_processor(project: str, processor_config: Blueprint):
    return start_processor(project, processor_config)


@task
def run_source(source: Source):
    return source.pull()


@task
def run_sink(sink: Sink, data: Document):
    return sink.push(data)


@task
def run_processor(processor: Processor, data: Document):
    return processor(data)


def execute(pipeline_config: PipelineConfiguration):
    project = pipeline_config.project
    with Flow("test-flow") as flow:
        # Service Initialization
        source = init_sink(project, pipeline_config.source)
        processors = [init_processor(project, processor_config) for processor_config in pipeline_config.processors]
        sinks = [init_sink(project, sink_config) for sink_config in pipeline_config.sinks]

        # Perform ETL
        datastream = run_source(source)
        for doc in datastream:
            for processor in processors:
                doc = run_processor(processor, doc)
            for sink in sinks:
                run_sink(sink, doc)

    flow.run()
The issue i'm running into is it is saying datastream is not iterable. While this may be a syntax issue on my end, the output of source.pull() is a generator. Are generators supported in prefect? If not - does anyone have an optimal workaround?
👋 1
🎉 1
z
Hi @Christopher Harris, welcome to Prefect! Taking a look at your question now. 🙂
👍 1
Because each task needs to finish before the downstream tasks kick off, we don't currently support tasks returning generators. As far as workarounds go, once you do have the results of
source.pull()
, you could map over the values held in
datastream
. Does that sound like it would work for your use case? https://docs.prefect.io/core/concepts/mapping.html#simple-mapping
c
For now that is a workaround we are considering. Thanks for the advice!