Gabe Grand
04/12/2021, 4:40 PMflow_1 (preprocessing) -> flow_2 (training) -> flow_3 (deployment)Gabe Grand
04/12/2021, 4:41 PM# Flow-of-flows definition
flow_1 = StartFlowRun(flow_name="preprocessing", project_name="examples", wait=True)
flow_2 = StartFlowRun(flow_name="training", project_name="examples", wait=True)
flow_3 = StartFlowRun(flow_name="deployment", project_name="examples", wait=True)
with Flow("parent-flow") as flow:
    r1 = flow_1()
    r2 = flow_2(upstream_tasks=[r1])
    r3 = flow_3(upstream_tasks=[r2])Gabe Grand
04/12/2021, 4:41 PMflow_0 that checks an external data source for new data on some routine schedule. I want to have flow_0 trigger the rest of the pipeline, but only when there is new data. I’m considering the following two solutions:
SOLUTION 1
My first instinct was to add flow_0 to the flow-of-flows, but I haven’t been able to find a clean way to use the result of flow_0 to determine whether to run the rest of the pipeline. Digging further, it seems like passing data between flows in a flow-of-flows is not yet supported, but it’s possible that I’m not fully understanding the situation.
SOLUTION 2
An alternative would be to append a StartFlowRun(flow_name="parent-flow") as the final task of flow_0 and make it conditional on whether there is new data. I know this will work, but it feels less clean than having all of the flows inside a single parent-flow pipeline that runs on a schedule.Gabe Grand
04/12/2021, 4:42 PMKevin Kho
SKIP condition if there is nothing. You could also use case I think.Kevin Kho
Gabe Grand
04/12/2021, 4:52 PMflow_0 to pass a parameter to flow_1 (e.g., the S3 location of the data that needs to get preprocessed)?Kevin Kho
Kevin Kho
Gabe Grand
04/12/2021, 5:45 PMKevin Kho
raise SKIP under an if clause to check if there is data. : https://docs.prefect.io/core/concepts/states.html#skip-while-runningKevin Kho
Kevin Kho
Gabe Grand
04/12/2021, 6:08 PMJohn Urbanik
04/12/2021, 8:08 PMprefect.client.client.Client.get_flow_run_state in a task and use that to retrieve the result of flow_0 so as to avoid having to modify flow_0 to persist the data elsewhere? I can try it out this evening, but was wondering if someone knew offhand.Kevin Kho
John Urbanik
04/12/2021, 10:47 PMflow_0 may be owned by another team so changing the Result class / using templating isn’t acceptable and you might not actually know the location it is stored in. I started playing around with it, and a few issues popped up:
1. StartFlowRun raises a signal on wait=True instead of returning a string as suggested in the documentation. I can get the run_id by doing a little string parsing, but it is messy.
2. I was wrong about the method, I need get_flow_run_info in order to actually get the state of the individual tasks.
3. This only gets the locations for given task slugs. You can 
Here is a minimal demo:
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.results import PrefectResult
@task
def add(x, y):
    return x + y
@task
def get_results_from_flow_run_for_slugs(flow_run_signal, slugs):
    client = prefect.Client()
    flow_state = client.get_flow_run_info(flow_run_signal.state.message.split(' ')[0])
    flow_tasks = flow_state.task_runs
    params = {
        tr.task_slug: tr.state._result.read(tr.state._result.location)
        for tr in flow_tasks if tr.task_slug in slugs
    }
    return params
with Flow("flow_0", result=PrefectResult()) as flow:
    result = add(3, 5, task_args=dict(name="get_this"))
flow.register(project_name="ju-testing")
first_flow = StartFlowRun(flow_name="flow_0", project_name="ju-testing", wait=True)
with Flow("parent", result=PrefectResult()) as p_flow:
    flow_run_signal = first_flow()
    res = get_results_from_flow_run_for_slugs(flow_run_signal, slugs=[result.slug])
p_flow.register(project_name="ju-testing")
If there is interest, I can probably make a few PRs to make a task to this effect (I’d really rather not be relying on this string splitting though). I feel like there is probably a more elegant solution so I won’t be bothered if not!Kevin Kho
Kevin Kho
John Urbanik
04/12/2021, 10:53 PMKevin Kho
Kevin Kho
John Urbanik
04/12/2021, 10:56 PMStartFlowRun so that it wraps this logic inside.Gabe Grand
04/13/2021, 2:47 PMStartFlowRun(..., wait=True) return a dict of per-task return values that can be passed to subsequent flowsGabe Grand
04/14/2021, 5:48 PMraise SKIP if there’s no new data. This works well for skipping the remaining tasks within step_1 (I also took your suggestion to merge steps 0 + 1 to avoid having to pass data between them). However, within the flow-of-flows, when step_1 raises SKIP, the subsequent step_2 and step_3 pipelines still get run. I’m assuming it’s because these are separate flows that get called with StartFlowRun. Is there any way to make these downstream flows respect the SKIP signal?Zanie
Gabe Grand
04/14/2021, 5:58 PMKevin Kho
SKIP is causing step_1 to be labelled a success. We had a recent change that will help us propagate the SKIP , but I need to test it out. Let me work on an example for you.Gabe Grand
04/14/2021, 6:20 PMStartFlowRun(…, wait=True) task could reflect the SKIP signal, though I’m obviously not sure how much complexity that will involve. If you have an example I can try out or a branch I can fetch, that’d be great.Zanie
Zanie
Kevin Kho
Gabe Grand
04/14/2021, 10:09 PMGabe Grand
04/14/2021, 10:10 PMZanie
Zanie
case statement check for the existence of the file.Gabe Grand
04/14/2021, 11:20 PMGabe Grand
04/14/2021, 11:26 PMJeremy Tee
04/15/2021, 6:54 AM("'Success' object has no attribute 'read'") Any ideas on why?Zanie
Zanie
Zanie
John Urbanik
04/15/2021, 2:17 PMZanie
it’d be really helpful to haveis not directly feasible because many of our customers have flow runs with >10,000 tasks which we could not feasibly collect and return like that.return a dict of per-task return values that can be passed to subsequent flowsStartFlowRun(..., wait=True)
John Urbanik
04/15/2021, 2:25 PMStartFlowRun a list of tasks/slugs that you want to return the results for. Internally maybe they’re mapped over with tasks so that you can parallelize the requests if it is still a relatively large number.
Another option would be to add another form of key task . At my company, we’re overloading reference_tasks as a way to get the set of tasks to pass to the next flow in our sequence.Zanie
Zanie
FlowRunner has an idea of return_tasks that's basically unused when running flows with the API