Hi All! :wave: Thanks for building such amazing ...
# prefect-community
n
Hi All! 👋 Thanks for building such amazing tools! I am truly enjoying using them! My questions have to do with caching and passing results between flows. I have not understood how to use the
cache_key
key, though it is mentioned several places as a way of sharing results between flows. I want to be able to create multiple pipeline flows in separate python files (for readability) and run them all together through a job. As well, however, I want them to be able to share inputs and outputs so that I don't need to re-calculate the same tasks over and over again in the different flows. I will try to explain an example of what I am trying to do: First, imagine I have 2 different flows, the map_reduce flow and the ETL flows from the docs. I want them both to start from the same variable, the list
[1, 2, 3]
so I have a third flow that I define as just an extract flow.
extract_flow.py
Copy code
from prefect import task, Flow

@task
def extract():
    return [1, 2, 3]
etl_flow.py
Copy code
from prefect import task, Flow, Parameter

# ETL Flow

@task
def transform(data):
    return [i * 10 for i in data]


@task
def load(data):
    print("Here's your data: {}".format(data))


with Flow("ETL") as etl_flow:
    e = Parameter('data')
    print(b)
    t = transform(e)
    l = load(t)
map_reduce_flow.py
Copy code
from prefect import task, Flow, Parameter

# Map Reduce Flow

@task
def map_task(x):
    return x + 1


@task
def reduce_task(x):
    return sum(x)

@task
def print_task(x):
    print("Here's your data: {}".format(x))

with Flow("Map / Reduce 🤓") as mr_flow:
    numbers = Parameter('data')
    first_map = map_task.map(numbers)
    second_map = map_task.map(first_map)
    reduction = reduce_task(second_map)
    printing = print_task(reduction)
From here, I want to combine them into a combination flow that I can run with
python combine_flow.py
Copy code
from prefect import task, Flow
from etl_flow import etl_flow
from map_reduce_flow import mr_flow
from extract import extract


with Flow("combination_flow") as extract_flow:
    data= extract()    

extract_flow_state = extract_flow.run()

etl_flow_state = etl_flow.run(data=extract_flow_state.result[data].result)
mr_flow_state = mr_flow.run(data=extract_flow_state.result[data].result)
This gives the output (as expected!)