Nate Joselson

01/29/2020, 10:03 AM
Hi All! 👋 Thanks for building such amazing tools! I am truly enjoying using them! My questions have to do with caching and passing results between flows. I have not understood how to use the
key, though it is mentioned several places as a way of sharing results between flows. I want to be able to create multiple pipeline flows in separate python files (for readability) and run them all together through a job. As well, however, I want them to be able to share inputs and outputs so that I don't need to re-calculate the same tasks over and over again in the different flows. I will try to explain an example of what I am trying to do: First, imagine I have 2 different flows, the map_reduce flow and the ETL flows from the docs. I want them both to start from the same variable, the list
[1, 2, 3]
so I have a third flow that I define as just an extract flow.
from prefect import task, Flow

def extract():
    return [1, 2, 3]
from prefect import task, Flow, Parameter

# ETL Flow

def transform(data):
    return [i * 10 for i in data]

def load(data):
    print("Here's your data: {}".format(data))

with Flow("ETL") as etl_flow:
    e = Parameter('data')
    t = transform(e)
    l = load(t)
from prefect import task, Flow, Parameter

# Map Reduce Flow

def map_task(x):
    return x + 1

def reduce_task(x):
    return sum(x)

def print_task(x):
    print("Here's your data: {}".format(x))

with Flow("Map / Reduce 🤓") as mr_flow:
    numbers = Parameter('data')
    first_map =
    second_map =
    reduction = reduce_task(second_map)
    printing = print_task(reduction)
From here, I want to combine them into a combination flow that I can run with
from prefect import task, Flow
from etl_flow import etl_flow
from map_reduce_flow import mr_flow
from extract import extract

with Flow("combination_flow") as extract_flow:
    data= extract()    

extract_flow_state =

etl_flow_state =[data].result)
mr_flow_state =[data].result)
This gives the output (as expected!)